Cleanup interval uses metadata table to avoid multiple executions simultaneously

Also bug fixes

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2022-12-01 19:31:23 +00:00
parent b782b520aa
commit 0b340d11d3
2 changed files with 49 additions and 9 deletions

View File

@ -101,10 +101,13 @@ func (m *migrations) Perform(ctx context.Context) error {
queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second)
_, err = metadataTx.ExecContext(queryCtx,
fmt.Sprintf(`UPDATE %s SET value = $1 WHERE key = 'migrations'`, m.MetadataTableName),
fmt.Sprintf(`INSERT INTO %s (key, value) VALUES ('migrations', $1) ON CONFLICT (key) DO UPDATE SET value = $1`, m.MetadataTableName),
strconv.Itoa(i+1),
)
cancel()
if err != nil {
return fmt.Errorf("failed to update migration level in metadata table: %w", err)
}
}
// Commit changes to the metadata table, which also releases the lock

View File

@ -15,11 +15,13 @@ package postgresql
import (
"context"
"crypto/rand"
"database/sql"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"math/big"
"strconv"
"time"
@ -353,7 +355,7 @@ func (p *postgresDBAccess) doDelete(db dbquerier, req *state.DeleteRequest) (err
var result sql.Result
if req.ETag == nil || *req.ETag == "" {
result, err = p.db.Exec("DELETE FROM state WHERE key = $1", req.Key)
result, err = db.Exec("DELETE FROM state WHERE key = $1", req.Key)
} else {
// Convert req.ETag to uint32 for postgres XID compatibility
var etag64 uint64
@ -449,7 +451,7 @@ func (p *postgresDBAccess) ExecuteMulti(request *state.TransactionalStateRequest
return fmt.Errorf("failed to commit transaction: %w", err)
}
return err
return nil
}
// Query executes a query against store.
@ -481,8 +483,12 @@ func (p *postgresDBAccess) scheduleCleanupExpiredData() {
p.logger.Infof("Schedule expired data clean up every %d seconds", int(p.cleanupInterval.Seconds()))
ticker := time.NewTicker(*p.cleanupInterval)
go func() {
// Add a randomized delay here to ensure that not all sidecars will get here at the same time
delay, _ := rand.Int(rand.Reader, big.NewInt(20))
time.Sleep(time.Duration(delay.Int64()) * time.Second)
ticker := time.NewTicker(*p.cleanupInterval)
for {
select {
case <-ticker.C:
@ -504,26 +510,57 @@ func (p *postgresDBAccess) cleanupTimeout() error {
tx, err := p.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
// Acquire a lock for the metadata table
// We use NOWAIT because if there's another lock, another process is doing a cleanup so nothing to see there
queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
_, err = tx.ExecContext(queryCtx, fmt.Sprintf("LOCK TABLE %s IN SHARE MODE NOWAIT", p.metadata.MetadataTableName))
cancel()
if err != nil {
return fmt.Errorf("failed to acquire lock: %w", err)
}
// Check when the last iteration was
var (
lastCleanupStr string
lastCleanup int
)
queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second)
err = tx.QueryRowContext(queryCtx,
fmt.Sprintf(`SELECT value FROM %s WHERE key = 'last-cleanup'`, p.metadata.MetadataTableName),
).Scan(&lastCleanupStr)
cancel()
if err == nil {
lastCleanup, err = strconv.Atoi(lastCleanupStr)
if err != nil || lastCleanup < 0 {
p.logger.Warnf("Invalid last cleanup time found in metadata table: %s", lastCleanupStr)
}
lastCleanup = 0
} else if !errors.Is(err, sql.ErrNoRows) {
p.logger.Warnf("Failed to read last cleanup time from database: %v", err)
lastCleanup = 0
}
// Note we're not using the transaction here as we don't want this to be rolled back or to lock the table unnecessarily
// Need to use fmt.Sprintf because we can't parametrize a table name
//nolint:gosec
stmt := fmt.Sprintf(`DELETE FROM %s WHERE expiredate IS NOT NULL AND expiredate < CURRENT_TIMESTAMP`, p.metadata.TableName)
res, err := tx.Exec(stmt)
res, err := p.db.ExecContext(ctx, stmt)
if err != nil {
return fmt.Errorf("failed to execute query: %v", err)
return fmt.Errorf("failed to execute query: %w", err)
}
cleaned, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("failed to count affected rows: %v", err)
return fmt.Errorf("failed to count affected rows: %w", err)
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("failed to commit transaction: %v", err)
return fmt.Errorf("failed to commit transaction: %w", err)
}
p.logger.Infof("Removed %d expired rows", cleaned)