Adds TTL support to the CockroachDB state. (#2618)

Signed-off-by: joshvanl <me@joshvanl.dev>
Co-authored-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
Josh van Leeuwen 2023-03-03 14:36:40 +00:00 committed by GitHub
parent c6d6678fbc
commit 4827d5d119
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 688 additions and 118 deletions

View File

@ -117,3 +117,9 @@ func (c *CockroachDB) GetComponentMetadata() map[string]string {
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo)
return metadataInfo
}
// Returns the dbaccess property.
// This method is used in tests.
func (c *CockroachDB) GetDBAccess() dbAccess {
return c.dbaccess
}

View File

@ -22,126 +22,209 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
"github.com/dapr/components-contrib/state/query"
"github.com/dapr/components-contrib/state/utils"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
"github.com/dapr/kit/retry"
// Blank import for the underlying PostgreSQL driver.
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
"github.com/dapr/components-contrib/state/query"
"github.com/dapr/components-contrib/state/utils"
)
const (
cleanupIntervalKey = "cleanupIntervalInSeconds"
connectionStringKey = "connectionString"
errMissingConnectionString = "missing connection string"
tableName = "state"
defaultTableName = "state"
defaultMetadataTableName = "dapr_metadata"
defaultMaxConnectionAttempts = 5 // A bad driver connection error can occur inside the sql code so this essentially allows for more retries since the sql code does not allow that to be changed
defaultCleanupInterval = time.Hour
)
// cockroachDBAccess implements dbaccess.
type cockroachDBAccess struct {
// CockroachDBAccess implements dbaccess.
type CockroachDBAccess struct {
logger logger.Logger
metadata cockroachDBMetadata
db *sql.DB
connectionString string
closeCh chan struct{}
closed atomic.Bool
wg sync.WaitGroup
}
type cockroachDBMetadata struct {
ConnectionString string
TableName string
MetadataTableName string
CleanupInterval *time.Duration
MaxConnectionAttempts *int
}
// newCockroachDBAccess creates a new instance of cockroachDBAccess.
func newCockroachDBAccess(logger logger.Logger) *cockroachDBAccess {
// Interface that contains methods for querying.
type dbquerier interface {
ExecContext(context.Context, string, ...any) (sql.Result, error)
QueryContext(context.Context, string, ...any) (*sql.Rows, error)
QueryRowContext(context.Context, string, ...any) *sql.Row
}
// newCockroachDBAccess creates a new instance of CockroachDBAccess.
func newCockroachDBAccess(logger logger.Logger) *CockroachDBAccess {
logger.Debug("Instantiating new CockroachDB state store")
return &cockroachDBAccess{
return &CockroachDBAccess{
logger: logger,
metadata: cockroachDBMetadata{},
db: nil,
connectionString: "",
closeCh: make(chan struct{}),
}
}
func parseMetadata(meta state.Metadata) (*cockroachDBMetadata, error) {
m := cockroachDBMetadata{}
metadata.DecodeMetadata(meta.Properties, &m)
m := cockroachDBMetadata{
CleanupInterval: ptr.Of(defaultCleanupInterval),
}
if err := metadata.DecodeMetadata(meta.Properties, &m); err != nil {
return nil, err
}
if m.ConnectionString == "" {
return nil, errors.New(errMissingConnectionString)
}
if len(m.TableName) == 0 {
m.TableName = defaultTableName
}
if len(m.MetadataTableName) == 0 {
m.MetadataTableName = defaultMetadataTableName
}
// Cleanup interval
s, ok := meta.Properties[cleanupIntervalKey]
if ok && s != "" {
cleanupIntervalInSec, err := strconv.ParseInt(s, 10, 0)
if err != nil {
return nil, fmt.Errorf("invalid value for '%s': %s", cleanupIntervalKey, s)
}
// Non-positive value from meta means disable auto cleanup.
if cleanupIntervalInSec > 0 {
m.CleanupInterval = ptr.Of(time.Duration(cleanupIntervalInSec) * time.Second)
} else {
m.CleanupInterval = nil
}
}
return &m, nil
}
// Init sets up CockroachDB connection and ensures that the state table exists.
func (p *cockroachDBAccess) Init(ctx context.Context, metadata state.Metadata) error {
p.logger.Debug("Initializing CockroachDB state store")
func (c *CockroachDBAccess) Init(ctx context.Context, metadata state.Metadata) error {
c.logger.Debug("Initializing CockroachDB state store")
meta, err := parseMetadata(metadata)
if err != nil {
return err
}
p.metadata = *meta
c.metadata = *meta
if p.metadata.ConnectionString == "" {
p.logger.Error("Missing CockroachDB connection string")
if c.metadata.ConnectionString == "" {
c.logger.Error("Missing CockroachDB connection string")
return fmt.Errorf(errMissingConnectionString)
} else {
p.connectionString = p.metadata.ConnectionString
c.connectionString = c.metadata.ConnectionString
}
databaseConn, err := sql.Open("pgx", p.connectionString)
databaseConn, err := sql.Open("pgx", c.connectionString)
if err != nil {
p.logger.Error(err)
c.logger.Error(err)
return err
}
p.db = databaseConn
c.db = databaseConn
if err = databaseConn.PingContext(ctx); err != nil {
return err
}
if err = p.ensureStateTable(tableName); err != nil {
if err = c.ensureStateTable(ctx, c.metadata.TableName); err != nil {
return err
}
if err = c.ensureMetadataTable(ctx, c.metadata.MetadataTableName); err != nil {
return err
}
// Ensure that a connection to the database is actually established
err = p.Ping(ctx)
if err != nil {
if err = c.Ping(ctx); err != nil {
return err
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.scheduleCleanup(ctx)
}()
return nil
}
// Set makes an insert or update to the database.
func (p *cockroachDBAccess) Set(ctx context.Context, req *state.SetRequest) error {
p.logger.Debug("Setting state value in CockroachDB")
func (c *CockroachDBAccess) Set(ctx context.Context, req *state.SetRequest) error {
return c.set(ctx, c.db, req)
}
func (c *CockroachDBAccess) set(ctx context.Context, d dbquerier, req *state.SetRequest) error {
c.logger.Debug("Setting state value in CockroachDB")
value, isBinary, err := validateAndReturnValue(req)
if err != nil {
return err
}
var result sql.Result
// TTL
var ttlSeconds int
ttl, ttlerr := utils.ParseTTL(req.Metadata)
if ttlerr != nil {
return fmt.Errorf("error parsing TTL: %w", ttlerr)
}
if ttl != nil {
ttlSeconds = *ttl
}
var (
query string
ttlQuery string
params []any
)
// Sprintf is required for table name because sql.DB does not substitute parameters for table names.
// Other parameters use sql.DB parameter substitution.
if req.ETag == nil {
result, err = p.db.ExecContext(ctx, fmt.Sprintf(
`INSERT INTO %s (key, value, isbinary, etag) VALUES ($1, $2, $3, 1)
ON CONFLICT (key) DO UPDATE SET value = $2, isbinary = $3, updatedate = NOW(), etag = EXCLUDED.etag + 1;`,
tableName), req.Key, value, isBinary)
query = `
INSERT INTO %[1]s
(key, value, isbinary, etag, expiredate)
VALUES
($1, $2, $3, 1, %[2]s)
ON CONFLICT (key) DO UPDATE SET
value = $2,
isbinary = $3,
updatedate = NOW(),
etag = EXCLUDED.etag + 1,
expiredate = %[2]s
;`
params = []any{req.Key, value, isBinary}
} else {
var etag64 uint64
etag64, err = strconv.ParseUint(*req.ETag, 10, 32)
@ -151,12 +234,27 @@ func (p *cockroachDBAccess) Set(ctx context.Context, req *state.SetRequest) erro
etag := uint32(etag64)
// When an etag is provided do an update - no insert.
result, err = p.db.ExecContext(ctx, fmt.Sprintf(
`UPDATE %s SET value = $1, isbinary = $2, updatedate = NOW(), etag = etag + 1
WHERE key = $3 AND etag = $4;`,
tableName), value, isBinary, req.Key, etag)
query = `
UPDATE %[1]s
SET
value = $1,
isbinary = $2,
updatedate = NOW(),
etag = etag + 1,
expiredate = %[2]s
WHERE
key = $3 AND etag = $4
;`
params = []any{value, isBinary, req.Key, etag}
}
if ttlSeconds > 0 {
ttlQuery = "CURRENT_TIMESTAMP + INTERVAL '" + strconv.Itoa(ttlSeconds) + " seconds'"
} else {
ttlQuery = "NULL"
}
result, err := d.ExecContext(ctx, fmt.Sprintf(query, c.metadata.TableName, ttlQuery), params...)
if err != nil {
return err
}
@ -173,9 +271,9 @@ func (p *cockroachDBAccess) Set(ctx context.Context, req *state.SetRequest) erro
return nil
}
func (p *cockroachDBAccess) BulkSet(ctx context.Context, req []state.SetRequest) error {
p.logger.Debug("Executing BulkSet request")
tx, err := p.db.Begin()
func (c *CockroachDBAccess) BulkSet(ctx context.Context, req []state.SetRequest) error {
c.logger.Debug("Executing BulkSet request")
tx, err := c.db.Begin()
if err != nil {
return err
}
@ -183,7 +281,7 @@ func (p *cockroachDBAccess) BulkSet(ctx context.Context, req []state.SetRequest)
if len(req) > 0 {
for _, s := range req {
sa := s // Fix for gosec G601: Implicit memory aliasing in for loop.
err = p.Set(ctx, &sa)
err = c.set(ctx, tx, &sa)
if err != nil {
tx.Rollback()
@ -198,8 +296,12 @@ func (p *cockroachDBAccess) BulkSet(ctx context.Context, req []state.SetRequest)
}
// Get returns data from the database. If data does not exist for the key an empty state.GetResponse will be returned.
func (p *cockroachDBAccess) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
p.logger.Debug("Getting state value from CockroachDB")
func (c *CockroachDBAccess) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
return c.get(ctx, c.db, req)
}
func (c *CockroachDBAccess) get(ctx context.Context, d dbquerier, req *state.GetRequest) (*state.GetResponse, error) {
c.logger.Debug("Getting state value from CockroachDB")
if req.Key == "" {
return nil, fmt.Errorf("missing key in get operation")
@ -208,7 +310,15 @@ func (p *cockroachDBAccess) Get(ctx context.Context, req *state.GetRequest) (*st
var value string
var isBinary bool
var etag int
err := p.db.QueryRowContext(ctx, fmt.Sprintf("SELECT value, isbinary, etag FROM %s WHERE key = $1", tableName), req.Key).Scan(&value, &isBinary, &etag)
query := `
SELECT
value, isbinary, etag
FROM %s
WHERE
key = $1
AND (expiredate IS NULL OR expiredate > CURRENT_TIMESTAMP)
;`
err := d.QueryRowContext(ctx, fmt.Sprintf(query, c.metadata.TableName), req.Key).Scan(&value, &isBinary, &etag)
if err != nil {
// If no rows exist, return an empty response, otherwise return the error.
if errors.Is(err, sql.ErrNoRows) {
@ -247,8 +357,12 @@ func (p *cockroachDBAccess) Get(ctx context.Context, req *state.GetRequest) (*st
}
// Delete removes an item from the state store.
func (p *cockroachDBAccess) Delete(ctx context.Context, req *state.DeleteRequest) error {
p.logger.Debug("Deleting state value from CockroachDB")
func (c *CockroachDBAccess) Delete(ctx context.Context, req *state.DeleteRequest) error {
return c.delete(ctx, c.db, req)
}
func (c *CockroachDBAccess) delete(ctx context.Context, d dbquerier, req *state.DeleteRequest) error {
c.logger.Debug("Deleting state value from CockroachDB")
if req.Key == "" {
return fmt.Errorf("missing key in delete operation")
@ -258,7 +372,7 @@ func (p *cockroachDBAccess) Delete(ctx context.Context, req *state.DeleteRequest
var err error
if req.ETag == nil {
result, err = p.db.ExecContext(ctx, "DELETE FROM state WHERE key = $1", req.Key)
result, err = d.ExecContext(ctx, "DELETE FROM state WHERE key = $1", req.Key)
} else {
var etag64 uint64
etag64, err = strconv.ParseUint(*req.ETag, 10, 32)
@ -267,7 +381,7 @@ func (p *cockroachDBAccess) Delete(ctx context.Context, req *state.DeleteRequest
}
etag := uint32(etag64)
result, err = p.db.ExecContext(ctx, "DELETE FROM state WHERE key = $1 and etag = $2", req.Key, etag)
result, err = d.ExecContext(ctx, "DELETE FROM state WHERE key = $1 and etag = $2", req.Key, etag)
}
if err != nil {
@ -286,9 +400,9 @@ func (p *cockroachDBAccess) Delete(ctx context.Context, req *state.DeleteRequest
return nil
}
func (p *cockroachDBAccess) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
p.logger.Debug("Executing BulkDelete request")
tx, err := p.db.Begin()
func (c *CockroachDBAccess) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
c.logger.Debug("Executing BulkDelete request")
tx, err := c.db.Begin()
if err != nil {
return err
}
@ -296,7 +410,7 @@ func (p *cockroachDBAccess) BulkDelete(ctx context.Context, req []state.DeleteRe
if len(req) > 0 {
for _, d := range req {
da := d // Fix for gosec G601: Implicit memory aliasing in for loop.
err = p.Delete(ctx, &da)
err = c.delete(ctx, tx, &da)
if err != nil {
tx.Rollback()
@ -310,10 +424,10 @@ func (p *cockroachDBAccess) BulkDelete(ctx context.Context, req []state.DeleteRe
return err
}
func (p *cockroachDBAccess) ExecuteMulti(ctx context.Context, request *state.TransactionalStateRequest) error {
p.logger.Debug("Executing PostgreSQL transaction")
func (c *CockroachDBAccess) ExecuteMulti(ctx context.Context, request *state.TransactionalStateRequest) error {
c.logger.Debug("Executing CockroachDB transaction")
tx, err := p.db.Begin()
tx, err := c.db.Begin()
if err != nil {
return err
}
@ -329,7 +443,7 @@ func (p *cockroachDBAccess) ExecuteMulti(ctx context.Context, request *state.Tra
return err
}
err = p.Set(ctx, &setReq)
err = c.set(ctx, tx, &setReq)
if err != nil {
tx.Rollback()
return err
@ -337,14 +451,13 @@ func (p *cockroachDBAccess) ExecuteMulti(ctx context.Context, request *state.Tra
case state.Delete:
var delReq state.DeleteRequest
delReq, err = getDelete(o)
if err != nil {
tx.Rollback()
return err
}
err = p.Delete(ctx, &delReq)
err = c.delete(ctx, tx, &delReq)
if err != nil {
tx.Rollback()
return err
@ -356,20 +469,19 @@ func (p *cockroachDBAccess) ExecuteMulti(ctx context.Context, request *state.Tra
}
}
err = tx.Commit()
return err
return tx.Commit()
}
// Query executes a query against store.
func (p *cockroachDBAccess) Query(ctx context.Context, req *state.QueryRequest) (*state.QueryResponse, error) {
p.logger.Debug("Getting query value from CockroachDB")
func (c *CockroachDBAccess) Query(ctx context.Context, req *state.QueryRequest) (*state.QueryResponse, error) {
c.logger.Debug("Getting query value from CockroachDB")
stateQuery := &Query{
query: "",
params: []interface{}{},
limit: 0,
skip: ptr.Of[int64](0),
tableName: c.metadata.TableName,
query: "",
params: []interface{}{},
limit: 0,
skip: ptr.Of[int64](0),
}
qbuilder := query.NewQueryBuilder(stateQuery)
if err := qbuilder.BuildQuery(&req.Query); err != nil {
@ -380,9 +492,9 @@ func (p *cockroachDBAccess) Query(ctx context.Context, req *state.QueryRequest)
}, err
}
p.logger.Debug("Query: " + stateQuery.query)
c.logger.Debug("Query: " + stateQuery.query)
data, token, err := stateQuery.execute(ctx, p.logger, p.db)
data, token, err := stateQuery.execute(ctx, c.logger, c.db)
if err != nil {
return &state.QueryResponse{
Results: []state.QueryItem{},
@ -399,10 +511,10 @@ func (p *cockroachDBAccess) Query(ctx context.Context, req *state.QueryRequest)
}
// Ping implements database ping.
func (p *cockroachDBAccess) Ping(ctx context.Context) error {
func (c *CockroachDBAccess) Ping(ctx context.Context) error {
retryCount := defaultMaxConnectionAttempts
if p.metadata.MaxConnectionAttempts != nil && *p.metadata.MaxConnectionAttempts >= 0 {
retryCount = *p.metadata.MaxConnectionAttempts
if c.metadata.MaxConnectionAttempts != nil && *c.metadata.MaxConnectionAttempts >= 0 {
retryCount = *c.metadata.MaxConnectionAttempts
}
config := retry.DefaultConfig()
config.Policy = retry.PolicyExponential
@ -411,43 +523,82 @@ func (p *cockroachDBAccess) Ping(ctx context.Context) error {
backoff := config.NewBackOff()
return retry.NotifyRecover(func() error {
err := p.db.PingContext(ctx)
err := c.db.PingContext(ctx)
if errors.Is(err, driver.ErrBadConn) {
return fmt.Errorf("error when attempting to establish connection with cockroachDB: %v", err)
}
return nil
}, backoff, func(err error, _ time.Duration) {
p.logger.Debugf("Could not establish connection with cockroachDB. Retrying...: %v", err)
c.logger.Debugf("Could not establish connection with cockroachDB. Retrying...: %v", err)
}, func() {
p.logger.Debug("Successfully established connection with cockroachDB after it previously failed")
c.logger.Debug("Successfully established connection with cockroachDB after it previously failed")
})
}
// Close implements io.Close.
func (p *cockroachDBAccess) Close() error {
if p.db != nil {
return p.db.Close()
func (c *CockroachDBAccess) Close() error {
if c.closed.CompareAndSwap(false, true) {
close(c.closeCh)
}
defer c.wg.Wait()
if c.db != nil {
return c.db.Close()
}
return nil
}
func (p *cockroachDBAccess) ensureStateTable(stateTableName string) error {
exists, err := tableExists(p.db, stateTableName)
func (c *CockroachDBAccess) ensureStateTable(ctx context.Context, stateTableName string) error {
exists, err := tableExists(ctx, c.db, stateTableName)
if err != nil {
return err
}
if !exists {
p.logger.Info("Creating CockroachDB state table")
createTable := fmt.Sprintf(`CREATE TABLE %s (
key text NOT NULL PRIMARY KEY,
value jsonb NOT NULL,
isbinary boolean NOT NULL,
etag INT,
insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updatedate TIMESTAMP WITH TIME ZONE NULL);`, stateTableName)
_, err = p.db.Exec(createTable)
c.logger.Info("Creating CockroachDB state table")
_, err = c.db.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %s (
key text NOT NULL PRIMARY KEY,
value jsonb NOT NULL,
isbinary boolean NOT NULL,
etag INT,
insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updatedate TIMESTAMP WITH TIME ZONE NULL,
expiredate TIMESTAMP WITH TIME ZONE NULL,
INDEX expiredate_idx (expiredate)
);`, stateTableName))
if err != nil {
return err
}
}
// If table was created before v1.11.
_, err = c.db.ExecContext(ctx, fmt.Sprintf(
`ALTER TABLE %s ADD COLUMN IF NOT EXISTS expiredate TIMESTAMP WITH TIME ZONE NULL;`, stateTableName))
if err != nil {
return err
}
_, err = c.db.ExecContext(ctx, fmt.Sprintf(
`CREATE INDEX IF NOT EXISTS expiredate_idx ON %s (expiredate);`, stateTableName))
if err != nil {
return err
}
return nil
}
func (c *CockroachDBAccess) ensureMetadataTable(ctx context.Context, metaTableName string) error {
exists, err := tableExists(ctx, c.db, metaTableName)
if err != nil {
return err
}
if !exists {
c.logger.Info("Creating CockroachDB metadata table")
_, err = c.db.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %s (
key text NOT NULL PRIMARY KEY,
value text NOT NULL
);`, metaTableName))
if err != nil {
return err
}
@ -456,9 +607,9 @@ func (p *cockroachDBAccess) ensureStateTable(stateTableName string) error {
return nil
}
func tableExists(db *sql.DB, tableName string) (bool, error) {
func tableExists(ctx context.Context, db *sql.DB, tableName string) (bool, error) {
exists := false
err := db.QueryRow("SELECT EXISTS (SELECT * FROM pg_tables where tablename = $1)", tableName).Scan(&exists)
err := db.QueryRowContext(ctx, "SELECT EXISTS (SELECT * FROM pg_tables where tablename = $1)", tableName).Scan(&exists)
return exists, err
}
@ -516,3 +667,96 @@ func getDelete(req state.TransactionalStateOperation) (state.DeleteRequest, erro
return delReq, nil
}
func (c *CockroachDBAccess) scheduleCleanup(ctx context.Context) {
if c.metadata.CleanupInterval == nil || *c.metadata.CleanupInterval <= 0 || c.closed.Load() {
return
}
c.logger.Infof("Schedule expired data clean up every %d seconds", int(c.metadata.CleanupInterval.Seconds()))
ticker := time.NewTicker(*c.metadata.CleanupInterval)
defer ticker.Stop()
var err error
for {
select {
case <-ticker.C:
err = c.CleanupExpired(ctx)
if err != nil {
c.logger.Errorf("Error removing expired data: %v", err)
}
case <-ctx.Done():
c.logger.Debug("Stopped background cleanup of expired data")
return
case <-c.closeCh:
c.logger.Debug("Stopping background because CockroachDBAccess is closing")
return
}
}
}
func (c *CockroachDBAccess) CleanupExpired(ctx context.Context) error {
// Check if the last iteration was too recent
// This performs an atomic operation, so allows coordination with other daprd
// processes too
canContinue, err := c.updateLastCleanup(ctx, *c.metadata.CleanupInterval)
if err != nil {
// Log errors only
c.logger.Warnf("Failed to read last cleanup time from database: %v", err)
}
if !canContinue {
c.logger.Debug("Last cleanup was performed too recently")
return nil
}
// Note we're not using the transaction here as we don't want this to be
// rolled back half-way or to lock the table unnecessarily.
// Need to use fmt.Sprintf because we can't parametrize a table name.
// Note we are not setting a timeout here as this query can take a "long"
// time, especially if there's no index on expiredate .
//nolint:gosec
stmt := fmt.Sprintf(`DELETE FROM %s WHERE expiredate IS NOT NULL AND expiredate < CURRENT_TIMESTAMP`, c.metadata.TableName)
res, err := c.db.ExecContext(ctx, stmt)
if err != nil {
return fmt.Errorf("failed to execute query: %w", err)
}
rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
c.logger.Infof("Removed %d expired rows", rows)
return nil
}
// updateLastCleanup sets the 'last-cleanup' value only if it's less than
// cleanupInterval.
// Returns true if the row was updated, which means that the cleanup can
// proceed.
func (c *CockroachDBAccess) updateLastCleanup(ctx context.Context, cleanupInterval time.Duration) (bool, error) {
res, err := c.db.ExecContext(ctx,
fmt.Sprintf(`INSERT INTO %[1]s (key, value)
VALUES ('last-cleanup', CURRENT_TIMESTAMP::STRING)
ON CONFLICT (key)
DO UPDATE SET value = CURRENT_TIMESTAMP::STRING
WHERE (EXTRACT('epoch' FROM CURRENT_TIMESTAMP - %[1]s.value::timestamp with time zone) * 1000)::bigint > $1`,
c.metadata.MetadataTableName),
cleanupInterval.Milliseconds()-100, // Subtract 100ms for some buffer
)
if err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
}
n, err := res.RowsAffected()
if err != nil {
return false, fmt.Errorf("failed to get rows affected: %w", err)
}
return n > 0, nil
}
// GetCleanupInterval returns the cleanupInterval property.
// This is primarily used for tests.
func (c *CockroachDBAccess) GetCleanupInterval() *time.Duration {
return c.metadata.CleanupInterval
}

View File

@ -28,7 +28,7 @@ import (
type mocks struct {
db *sql.DB
mock sqlmock.Sqlmock
roachDba *cockroachDBAccess
roachDba *CockroachDBAccess
}
func TestGetSetWithWrongType(t *testing.T) {
@ -450,7 +450,7 @@ func mockDatabase(t *testing.T) (*mocks, error) {
t.Fatalf("an error '%s' was not expected when opening a stub database connection", err)
}
dba := &cockroachDBAccess{
dba := &CockroachDBAccess{
logger: logger,
db: db,
}

View File

@ -19,7 +19,9 @@ import (
"encoding/json"
"fmt"
"os"
"strconv"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
@ -66,7 +68,7 @@ func TestCockroachDBIntegration(t *testing.T) {
t.Run("Create table succeeds", func(t *testing.T) {
t.Parallel()
dbAccess, ok := pgs.dbaccess.(*cockroachDBAccess)
dbAccess, ok := pgs.dbaccess.(*CockroachDBAccess)
assert.True(t, ok)
testCreateTable(t, dbAccess)
@ -146,6 +148,11 @@ func TestCockroachDBIntegration(t *testing.T) {
t.Parallel()
multiWithSetOnly(t, pgs)
})
t.Run("Set with TTL should not return after ttl", func(t *testing.T) {
t.Parallel()
setWithTTLShouldNotReturnAfterTTL(t, pgs)
})
}
// setGetUpdateDeleteOneItem validates setting one item, getting it, and deleting it.
@ -169,22 +176,22 @@ func setGetUpdateDeleteOneItem(t *testing.T, pgs *CockroachDB) {
}
// testCreateTable tests the ability to create the state table.
func testCreateTable(t *testing.T, dba *cockroachDBAccess) {
func testCreateTable(t *testing.T, dba *CockroachDBAccess) {
t.Helper()
tableName := "test_state"
// Drop the table if it already exists.
exists, err := tableExists(dba.db, tableName)
exists, err := tableExists(context.Background(), dba.db, tableName)
assert.Nil(t, err)
if exists {
dropTable(t, dba.db, tableName)
}
// Create the state table and test for its existence.
err = dba.ensureStateTable(tableName)
err = dba.ensureStateTable(context.Background(), tableName)
assert.Nil(t, err)
exists, err = tableExists(dba.db, tableName)
exists, err = tableExists(context.Background(), dba.db, tableName)
assert.Nil(t, err)
assert.True(t, exists)
@ -617,7 +624,7 @@ func testInitConfiguration(t *testing.T) {
err := cockroackDB.Init(context.Background(), metadata)
if rowTest.expectedErr == "" {
assert.Nil(t, err)
assert.NoError(t, err)
} else {
assert.NotNil(t, err)
assert.Equal(t, err.Error(), rowTest.expectedErr)
@ -651,6 +658,37 @@ func setItem(t *testing.T, pgs *CockroachDB, key string, value interface{}, etag
assert.True(t, itemExists)
}
func setWithTTLShouldNotReturnAfterTTL(t *testing.T, pgs *CockroachDB) {
t.Helper()
key := randomKey()
value := &fakeItem{Color: "indigo"}
setItemWithTTL(t, pgs, key, value, nil, time.Second)
_, outputObject := getItem(t, pgs, key)
assert.Equal(t, value, outputObject)
<-time.After(time.Second * 2)
getResponse, outputObject := getItem(t, pgs, key)
assert.Equal(t, new(fakeItem), outputObject)
newValue := &fakeItem{Color: "green"}
setItemWithTTL(t, pgs, key, newValue, getResponse.ETag, time.Second)
getResponse, outputObject = getItem(t, pgs, key)
assert.Equal(t, newValue, outputObject)
setItemWithTTL(t, pgs, key, newValue, getResponse.ETag, time.Second*5)
<-time.After(time.Second * 2)
getResponse, outputObject = getItem(t, pgs, key)
assert.Equal(t, newValue, outputObject)
deleteItem(t, pgs, key, getResponse.ETag)
}
func getItem(t *testing.T, pgs *CockroachDB, key string) (*state.GetResponse, *fakeItem) {
t.Helper()
@ -663,7 +701,7 @@ func getItem(t *testing.T, pgs *CockroachDB, key string) (*state.GetResponse, *f
}
response, getErr := pgs.Get(context.Background(), getReq)
assert.Nil(t, getErr)
assert.NoError(t, getErr)
assert.NotNil(t, response)
outputObject := &fakeItem{
Color: "",
@ -699,7 +737,7 @@ func storeItemExists(t *testing.T, key string) bool {
defer databaseConnection.Close()
exists := false
statement := fmt.Sprintf(`SELECT EXISTS (SELECT * FROM %s WHERE key = $1)`, tableName)
statement := fmt.Sprintf(`SELECT EXISTS (SELECT * FROM %s WHERE key = $1)`, defaultTableName)
err = databaseConnection.QueryRow(statement, key).Scan(&exists)
assert.Nil(t, err)
@ -713,12 +751,35 @@ func getRowData(t *testing.T, key string) (returnValue string, insertdate sql.Nu
assert.Nil(t, err)
defer databaseConnection.Close()
err = databaseConnection.QueryRow(fmt.Sprintf("SELECT value, insertdate, updatedate FROM %s WHERE key = $1", tableName), key).Scan(&returnValue, &insertdate, &updatedate)
err = databaseConnection.QueryRow(fmt.Sprintf("SELECT value, insertdate, updatedate FROM %s WHERE key = $1", defaultTableName), key).Scan(&returnValue, &insertdate, &updatedate)
assert.Nil(t, err)
return returnValue, insertdate, updatedate
}
func setItemWithTTL(t *testing.T, pgs *CockroachDB, key string, value interface{}, etag *string, ttl time.Duration) {
t.Helper()
setReq := &state.SetRequest{
Key: key,
ETag: etag,
Value: value,
Metadata: map[string]string{
"ttlInSeconds": strconv.FormatInt(int64(ttl.Seconds()), 10),
},
Options: state.SetStateOption{
Concurrency: "",
Consistency: "",
},
ContentType: nil,
}
err := pgs.Set(context.Background(), setReq)
assert.Nil(t, err)
itemExists := storeItemExists(t, key)
assert.True(t, itemExists)
}
func randomKey() string {
return uuid.New().String()
}

View File

@ -27,10 +27,11 @@ import (
)
type Query struct {
query string
params []interface{}
limit int
skip *int64
tableName string
query string
params []interface{}
limit int
skip *int64
}
func (q *Query) VisitEQ(filter *query.EQ) (string, error) {
@ -96,7 +97,7 @@ func (q *Query) VisitOR(filter *query.OR) (string, error) {
}
func (q *Query) Finalize(filters string, storeQuery *query.Query) error {
q.query = fmt.Sprintf("SELECT key, value, etag FROM %s", tableName)
q.query = fmt.Sprintf("SELECT key, value, etag FROM %s", q.tableName)
if filters != "" {
q.query += fmt.Sprintf(" WHERE %s", filters)

View File

@ -64,10 +64,11 @@ func TestPostgresqlQueryBuildQuery(t *testing.T) {
assert.NoError(t, err)
stateQuery := &Query{
query: "",
params: nil,
limit: 0,
skip: ptr.Of[int64](0),
tableName: defaultTableName,
query: "",
params: nil,
limit: 0,
skip: ptr.Of[int64](0),
}
qbuilder := query.NewQueryBuilder(stateQuery)
err = qbuilder.BuildQuery(&storeQuery)

View File

@ -39,4 +39,13 @@ The purpose of this module is to provide tests that certify the CockroachDB Stat
* Verify that cockroachDB can have state data persist after a restart
* Stop cockroachDB by calling a dockercompose stop command
* Start cockroachDB by calling a dockercompose start command
* Perform a Get Operation using a previously inserted key and verify that the data previously saved before the restart is returned
* Perform a Get Operation using a previously inserted key and verify that the data previously saved before the restart is returned
## TTLs and cleanups
1. Correctly parse the `cleanupIntervalInSeconds` metadata property:
- No value uses the default value (3600 seconds)
- A positive value sets the interval to the given number of seconds
- A zero or negative value disables the cleanup
2. The cleanup method deletes expired records and updates the metadata table with the last time it ran
3. The cleanup method doesn't run if the last iteration was less than `cleanupIntervalInSeconds` or if another process is doing the cleanup

View File

@ -15,11 +15,22 @@ package cockroachdb_test
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"testing"
"time"
"github.com/dapr/dapr/pkg/runtime"
dapr_testing "github.com/dapr/dapr/pkg/testing"
goclient "github.com/dapr/go-sdk/client"
"github.com/dapr/kit/logger"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
state_cockroach "github.com/dapr/components-contrib/state/cockroachdb"
"github.com/dapr/components-contrib/tests/certification/embedded"
@ -27,12 +38,6 @@ import (
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
state_loader "github.com/dapr/dapr/pkg/components/state"
"github.com/dapr/dapr/pkg/runtime"
dapr_testing "github.com/dapr/dapr/pkg/testing"
goclient "github.com/dapr/go-sdk/client"
"github.com/dapr/kit/logger"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
const (
@ -63,6 +68,22 @@ func TestCockroach(t *testing.T) {
keyOne := uuid.New()
keyOneString := strings.Replace(keyOne.String(), "-", "", -1)
var dbClient *sql.DB
connectStep := func(ctx flow.Context) error {
connCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// Continue re-trying until the context times out, so we can wait for the DB to be up
for {
dbClient, err = sql.Open("pgx", "host=localhost user=root port=26257 connect_timeout=10 database=dapr_test")
if err == nil || connCtx.Err() != nil {
break
}
time.Sleep(750 * time.Millisecond)
}
return err
}
basicTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort))
if err != nil {
@ -167,7 +188,6 @@ func TestCockroach(t *testing.T) {
// Transaction related test - also for Multi
transactionsTest := func(ctx flow.Context) error {
// Set state to allow for a delete operation inside the multi list
err = stateStore.Set(context.Background(), &state.SetRequest{Key: certificationTestPrefix + "key1", Value: []byte("certificationdata")})
assert.NoError(t, err)
@ -250,6 +270,173 @@ func TestCockroach(t *testing.T) {
return nil
}
// Validates TTLs and garbage collections
ttlTest := func(ctx flow.Context) error {
md := state.Metadata{
Base: metadata.Base{
Name: "ttltest",
Properties: map[string]string{
"connectionString": "host=localhost user=root port=26257 connect_timeout=10 database=dapr_test",
"tableName": "ttl_state",
"metadataTableName": "ttl_metadata",
},
},
}
t.Run("parse cleanupIntervalInSeconds", func(t *testing.T) {
t.Run("default value", func(t *testing.T) {
// Default value is 1 hr
md.Properties["cleanupIntervalInSeconds"] = ""
storeObj := state_cockroach.New(log).(*state_cockroach.CockroachDB)
err := storeObj.Init(context.Background(), md)
require.NoError(t, err, "failed to init")
defer storeObj.Close()
dbAccess := storeObj.GetDBAccess().(*state_cockroach.CockroachDBAccess)
require.NotNil(t, dbAccess)
cleanupInterval := dbAccess.GetCleanupInterval()
require.NotNil(t, cleanupInterval)
assert.Equal(t, time.Duration(1*time.Hour), *cleanupInterval)
})
t.Run("positive value", func(t *testing.T) {
// A positive value is interpreted in seconds
md.Properties["cleanupIntervalInSeconds"] = "10"
storeObj := state_cockroach.New(log).(*state_cockroach.CockroachDB)
err := storeObj.Init(context.Background(), md)
require.NoError(t, err, "failed to init")
defer storeObj.Close()
dbAccess := storeObj.GetDBAccess().(*state_cockroach.CockroachDBAccess)
require.NotNil(t, dbAccess)
cleanupInterval := dbAccess.GetCleanupInterval()
require.NotNil(t, cleanupInterval)
assert.Equal(t, time.Duration(10*time.Second), *cleanupInterval)
})
t.Run("disabled", func(t *testing.T) {
// A value of <=0 means that the cleanup is disabled
md.Properties["cleanupIntervalInSeconds"] = "0"
storeObj := state_cockroach.New(log).(*state_cockroach.CockroachDB)
err := storeObj.Init(context.Background(), md)
require.NoError(t, err, "failed to init")
defer storeObj.Close()
dbAccess := storeObj.GetDBAccess().(*state_cockroach.CockroachDBAccess)
require.NotNil(t, dbAccess)
cleanupInterval := dbAccess.GetCleanupInterval()
_ = assert.Nil(t, cleanupInterval)
})
})
t.Run("cleanup", func(t *testing.T) {
md := state.Metadata{
Base: metadata.Base{
Name: "ttltest",
Properties: map[string]string{
"connectionString": "host=localhost user=root port=26257 connect_timeout=10 database=dapr_test",
"tableName": "ttl_state",
"metadataTableName": "ttl_metadata",
},
},
}
t.Run("automatically delete expiredate records", func(t *testing.T) {
// Run every second
md.Properties["cleanupIntervalInSeconds"] = "1"
storeObj := state_cockroach.New(log).(*state_cockroach.CockroachDB)
err := storeObj.Init(context.Background(), md)
require.NoError(t, err, "failed to init")
defer storeObj.Close()
// Seed the database with some records
err = populateTTLRecords(ctx, dbClient)
require.NoError(t, err, "failed to seed records")
// Wait 3 seconds then verify we have only 10 rows left
time.Sleep(3 * time.Second)
count, err := countRowsInTable(ctx, dbClient, "ttl_state")
require.NoError(t, err, "failed to run query to count rows")
assert.Equal(t, 10, count)
// The "last-cleanup" value should be <= 1 second (+ a bit of buffer)
lastCleanup, err := loadLastCleanupInterval(ctx, dbClient, "ttl_metadata")
require.NoError(t, err, "failed to load value for 'last-cleanup'")
assert.LessOrEqual(t, lastCleanup, int64(1200))
// Wait 6 more seconds and verify there are no more rows left
time.Sleep(6 * time.Second)
count, err = countRowsInTable(ctx, dbClient, "ttl_state")
require.NoError(t, err, "failed to run query to count rows")
assert.Equal(t, 0, count)
// The "last-cleanup" value should be <= 1 second (+ a bit of buffer)
lastCleanup, err = loadLastCleanupInterval(ctx, dbClient, "ttl_metadata")
require.NoError(t, err, "failed to load value for 'last-cleanup'")
assert.LessOrEqual(t, lastCleanup, int64(1200))
})
t.Run("cleanup concurrency", func(t *testing.T) {
// Set to run every hour
// (we'll manually trigger more frequent iterations)
md.Properties["cleanupIntervalInSeconds"] = "3600"
storeObj := state_cockroach.New(log).(*state_cockroach.CockroachDB)
err := storeObj.Init(context.Background(), md)
require.NoError(t, err, "failed to init")
defer storeObj.Close()
// Seed the database with some records
err = populateTTLRecords(ctx, dbClient)
require.NoError(t, err, "failed to seed records")
// Validate that 20 records are present
count, err := countRowsInTable(ctx, dbClient, "ttl_state")
require.NoError(t, err, "failed to run query to count rows")
assert.Equal(t, 20, count)
// Set last-cleanup to 1s ago (less than 3600s)
_, err = dbClient.ExecContext(ctx,
fmt.Sprintf(`INSERT INTO ttl_metadata (key, value) VALUES ('last-cleanup', %[1]s) ON CONFLICT (key) DO UPDATE SET value = %[1]s`, "(CURRENT_TIMESTAMP - interval '1 second')::STRING"),
)
require.NoError(t, err, "failed to set last-cleanup")
// The "last-cleanup" value should be ~1 second (+ a bit of buffer)
lastCleanup, err := loadLastCleanupInterval(ctx, dbClient, "ttl_metadata")
require.NoError(t, err, "failed to load value for 'last-cleanup'")
assert.LessOrEqual(t, lastCleanup, int64(1200))
lastCleanupValueOrig, err := getValueFromMetadataTable(ctx, dbClient, "ttl_metadata", "last-cleanup")
require.NoError(t, err, "failed to load absolute value for 'last-cleanup'")
require.NotEmpty(t, lastCleanupValueOrig)
// Trigger the background cleanup, which should do nothing because the last cleanup was < 3600s
dbAccess := storeObj.GetDBAccess().(*state_cockroach.CockroachDBAccess)
require.NotNil(t, dbAccess)
err = dbAccess.CleanupExpired(ctx)
require.NoError(t, err, "CleanupExpired returned an error")
// Validate that 20 records are still present
count, err = countRowsInTable(ctx, dbClient, "ttl_state")
require.NoError(t, err, "failed to run query to count rows")
assert.Equal(t, 20, count)
// The "last-cleanup" value should not have been changed
lastCleanupValue, err := getValueFromMetadataTable(ctx, dbClient, "ttl_metadata", "last-cleanup")
require.NoError(t, err, "failed to load absolute value for 'last-cleanup'")
assert.Equal(t, lastCleanupValueOrig, lastCleanupValue)
})
})
return nil
}
flow.New(t, "Connecting cockroachdb And Verifying majority of the tests here").
Step(dockercompose.Run("cockroachdb", dockerComposeYAML)).
Step("Waiting for cockroachdb readiness", flow.Sleep(30*time.Second)).
@ -260,9 +447,11 @@ func TestCockroach(t *testing.T) {
embedded.WithComponentsPath("components/standard"),
runtime.WithStates(stateRegistry),
)).
Step("connect to the database", connectStep).
Step("Run basic test", basicTest).
Step("Run eTag test", eTagTest).
Step("Run transactions test", transactionsTest).
Step("run TTL test", ttlTest).
Step("Stop cockroachdb server", dockercompose.Stop("cockroachdb", dockerComposeYAML, "cockroachdb")).
Step("Sleep after dockercompose stop", flow.Sleep(10*time.Second)).
Step("Start cockroachdb server", dockercompose.Start("cockroachdb", dockerComposeYAML, "cockroachdb")).
@ -270,3 +459,62 @@ func TestCockroach(t *testing.T) {
Step("Get Values Saved Earlier And Not Expired, after cockroachdb restart", testGetAfterCockroachdbRestart).
Run()
}
func loadLastCleanupInterval(ctx context.Context, dbClient *sql.DB, table string) (lastCleanup int64, err error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
err = dbClient.
QueryRowContext(ctx,
fmt.Sprintf("SELECT (EXTRACT('epoch' FROM CURRENT_TIMESTAMP - value::timestamp with time zone) * 1000)::bigint FROM %s WHERE key = 'last-cleanup'", table),
).
Scan(&lastCleanup)
return
}
func countRowsInTable(ctx context.Context, dbClient *sql.DB, table string) (count int, err error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
err = dbClient.QueryRowContext(ctx, "SELECT COUNT(key) FROM "+table).Scan(&count)
return
}
func populateTTLRecords(ctx context.Context, dbClient *sql.DB) error {
// Insert 10 records that have expired, and 10 that will expire in 4
// seconds.
rows := make([][]any, 20)
for i := 0; i < 10; i++ {
rows[i] = []any{
fmt.Sprintf("expired_%d", i),
json.RawMessage(fmt.Sprintf(`"value_%d"`, i)),
false,
"CURRENT_TIMESTAMP - INTERVAL '1 minutes'",
}
}
for i := 0; i < 10; i++ {
rows[i+10] = []any{
fmt.Sprintf("notexpired_%d", i),
json.RawMessage(fmt.Sprintf(`"value_%d"`, i)),
false,
"CURRENT_TIMESTAMP + INTERVAL '4 seconds'",
}
}
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
for _, row := range rows {
query := fmt.Sprintf("INSERT INTO ttl_state (key, value, isbinary, expiredate) VALUES ($1, $2, $3, %s)", row[3])
_, err := dbClient.ExecContext(ctx, query, row[0], row[1], row[2])
if err != nil {
return err
}
}
return nil
}
func getValueFromMetadataTable(ctx context.Context, dbClient *sql.DB, table, key string) (value string, err error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
err = dbClient.
QueryRowContext(ctx, fmt.Sprintf("SELECT value FROM %s WHERE key = $1", table), key).
Scan(&value)
cancel()
return
}

View File

@ -51,7 +51,7 @@ components:
operations: [ "set", "get", "delete", "bulkset", "bulkdelete"]
- component: cockroachdb
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "query" ]
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "query", "ttl" ]
- component: rethinkdb
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete"]