Speed up expired authz purger (#3267)
Now, rather than LIMIT / OFFSET, this uses the highest id from the last batch in each new batch's query. This makes efficient use of the index, and means the database does not have to scan over a large number of non-expired rows before starting to find any expired rows. This also changes the structure of the purge function to continually push ids for deletion onto a channel, to be processed by goroutines consuming that channel. Also, remove the --yes flag and prompting.
This commit is contained in:
parent
68d5cc3331
commit
90f7998b15
|
|
@ -1,16 +1,13 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/jmhodges/clock"
|
||||
|
|
@ -46,8 +43,7 @@ type expiredAuthzPurger struct {
|
|||
}
|
||||
|
||||
// purge looks up pending or finalized authzs (depending on the value of
|
||||
// `table`) that expire before `purgeBefore`. If `yes` is true, or if a user at
|
||||
// the terminal types "y", it will then delete those authzs, using `parallelism`
|
||||
// `table`) that expire before `purgeBefore`, using `parallelism`
|
||||
// goroutines. It will delete a maximum of `max` authzs.
|
||||
// Neither table has an index on `expires` by itself, so we just iterate through
|
||||
// the table with LIMIT and OFFSET using the default ordering. Note that this
|
||||
|
|
@ -55,70 +51,53 @@ type expiredAuthzPurger struct {
|
|||
// database will have to scan through many rows before it finds some that meet
|
||||
// the expiration criteria. When we move to better authz storage (#2620), we
|
||||
// will get an appropriate index that will make this cheaper.
|
||||
func (p *expiredAuthzPurger) purge(table string, yes bool, purgeBefore time.Time, parallelism int, max int) error {
|
||||
var ids []string
|
||||
for len(ids) < max {
|
||||
var idBatch []string
|
||||
var query string
|
||||
switch table {
|
||||
case "pendingAuthorizations":
|
||||
query = "SELECT id FROM pendingAuthorizations WHERE expires <= ? ORDER BY id LIMIT ? OFFSET ?"
|
||||
case "authz":
|
||||
query = "SELECT id FROM authz WHERE expires <= ? ORDER BY id LIMIT ? OFFSET ?"
|
||||
}
|
||||
_, err := p.db.Select(
|
||||
&idBatch,
|
||||
query,
|
||||
purgeBefore,
|
||||
p.batchSize,
|
||||
len(ids),
|
||||
)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return err
|
||||
}
|
||||
if len(idBatch) == 0 {
|
||||
break
|
||||
}
|
||||
ids = append(ids, idBatch...)
|
||||
}
|
||||
if len(ids) > max {
|
||||
ids = ids[:max]
|
||||
func (p *expiredAuthzPurger) purge(table string, purgeBefore time.Time, parallelism int, max int) error {
|
||||
var query string
|
||||
switch table {
|
||||
case "pendingAuthorizations":
|
||||
query = "SELECT id FROM pendingAuthorizations WHERE id >= :id AND expires <= :expires ORDER BY id LIMIT :limit"
|
||||
case "authz":
|
||||
query = "SELECT id FROM authz WHERE id >= :id AND expires <= :expires ORDER BY id LIMIT :limit"
|
||||
}
|
||||
|
||||
if !yes {
|
||||
reader := bufio.NewReader(os.Stdin)
|
||||
for {
|
||||
fmt.Fprintf(
|
||||
os.Stdout,
|
||||
"\nAbout to purge %d authorizations from %s and all associated challenges, proceed? [y/N]: ",
|
||||
len(ids),
|
||||
table,
|
||||
done := make(chan int)
|
||||
work := make(chan string)
|
||||
go func() {
|
||||
// id starts as "", which is smaller than all other ids.
|
||||
var id string
|
||||
var count int
|
||||
for count < max {
|
||||
var idBatch []string
|
||||
_, err := p.db.Select(
|
||||
&idBatch,
|
||||
query,
|
||||
map[string]interface{}{
|
||||
"id": id,
|
||||
"expires": purgeBefore,
|
||||
"limit": p.batchSize,
|
||||
},
|
||||
)
|
||||
text, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
text = strings.ToLower(text)
|
||||
if text != "y\n" && text != "n\n" && text != "\n" {
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
p.log.AuditErr(fmt.Sprintf("Getting a batch: %s", err))
|
||||
time.Sleep(10)
|
||||
continue
|
||||
}
|
||||
if text == "n\n" || text == "\n" {
|
||||
os.Exit(0)
|
||||
} else {
|
||||
for _, v := range idBatch {
|
||||
work <- v
|
||||
count += 1
|
||||
// Start the next query at the highest id we saw in this batch.
|
||||
id = v
|
||||
}
|
||||
p.log.Info(fmt.Sprintf("Deleted %d authzs from %s so far", count, table))
|
||||
if len(idBatch) < int(p.batchSize) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
close(work)
|
||||
done <- count
|
||||
}()
|
||||
|
||||
wg := new(sync.WaitGroup)
|
||||
work := make(chan string)
|
||||
go func() {
|
||||
for _, id := range ids {
|
||||
work <- id
|
||||
}
|
||||
close(work)
|
||||
}()
|
||||
var deletions int64
|
||||
for i := 0; i < parallelism; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
|
|
@ -128,19 +107,14 @@ func (p *expiredAuthzPurger) purge(table string, yes bool, purgeBefore time.Time
|
|||
if err != nil {
|
||||
p.log.AuditErr(fmt.Sprintf("Deleting %s: %s", id, err))
|
||||
}
|
||||
atomic.AddInt64(&deletions, 1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
for _ = range time.Tick(10 * time.Second) {
|
||||
p.log.Info(fmt.Sprintf("Deleted %d authzs from %s so far", deletions, table))
|
||||
}
|
||||
}()
|
||||
|
||||
count := <-done
|
||||
wg.Wait()
|
||||
|
||||
p.log.Info(fmt.Sprintf("Deleted a total of %d expired authorizations from %s", len(ids), table))
|
||||
p.log.Info(fmt.Sprintf("Deleted a total of %d expired authorizations from %s", count, table))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -166,9 +140,11 @@ func deleteAuthorization(db *gorp.DbMap, table, id string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *expiredAuthzPurger) purgeAuthzs(purgeBefore time.Time, yes bool, parallelism int, max int) error {
|
||||
for _, table := range []string{"pendingAuthorizations", "authz"} {
|
||||
err := p.purge(table, yes, purgeBefore, parallelism, max)
|
||||
func (p *expiredAuthzPurger) purgeAuthzs(purgeBefore time.Time, parallelism int, max int) error {
|
||||
// Purge authz first because it tends to be bigger and in more need of
|
||||
// purging.
|
||||
for _, table := range []string{"authz", "pendingAuthorizations"} {
|
||||
err := p.purge(table, purgeBefore, parallelism, max)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -177,7 +153,6 @@ func (p *expiredAuthzPurger) purgeAuthzs(purgeBefore time.Time, yes bool, parall
|
|||
}
|
||||
|
||||
func main() {
|
||||
yes := flag.Bool("yes", false, "Skips the purge confirmation")
|
||||
configPath := flag.String("config", "config.json", "Path to Boulder configuration file")
|
||||
flag.Parse()
|
||||
|
||||
|
|
@ -222,7 +197,7 @@ func main() {
|
|||
}
|
||||
purgeBefore := purger.clk.Now().Add(-config.ExpiredAuthzPurger.GracePeriod.Duration)
|
||||
logger.Info("Beginning purge")
|
||||
err = purger.purgeAuthzs(purgeBefore, *yes, int(config.ExpiredAuthzPurger.Parallelism),
|
||||
err = purger.purgeAuthzs(purgeBefore, int(config.ExpiredAuthzPurger.Parallelism),
|
||||
int(config.ExpiredAuthzPurger.MaxAuthzs))
|
||||
cmd.FailOnError(err, "Failed to purge authorizations")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ func TestPurgeAuthzs(t *testing.T) {
|
|||
|
||||
p := expiredAuthzPurger{log, fc, dbMap, 1}
|
||||
|
||||
err = p.purgeAuthzs(time.Time{}, true, 10, 100)
|
||||
err = p.purgeAuthzs(time.Time{}, 10, 100)
|
||||
test.AssertNotError(t, err, "purgeAuthzs failed")
|
||||
|
||||
old, new := fc.Now().Add(-time.Hour), fc.Now().Add(time.Hour)
|
||||
|
|
@ -58,7 +58,7 @@ func TestPurgeAuthzs(t *testing.T) {
|
|||
})
|
||||
test.AssertNotError(t, err, "NewPendingAuthorization failed")
|
||||
|
||||
err = p.purgeAuthzs(fc.Now(), true, 10, 100)
|
||||
err = p.purgeAuthzs(fc.Now(), 10, 100)
|
||||
test.AssertNotError(t, err, "purgeAuthzs failed")
|
||||
count, err := dbMap.SelectInt("SELECT COUNT(1) FROM pendingAuthorizations")
|
||||
test.AssertNotError(t, err, "dbMap.SelectInt failed")
|
||||
|
|
@ -67,7 +67,7 @@ func TestPurgeAuthzs(t *testing.T) {
|
|||
test.AssertNotError(t, err, "dbMap.SelectInt failed")
|
||||
test.AssertEquals(t, count, int64(1))
|
||||
|
||||
err = p.purgeAuthzs(fc.Now().Add(time.Hour), true, 10, 100)
|
||||
err = p.purgeAuthzs(fc.Now().Add(time.Hour), 10, 100)
|
||||
test.AssertNotError(t, err, "purgeAuthzs failed")
|
||||
count, err = dbMap.SelectInt("SELECT COUNT(1) FROM pendingAuthorizations")
|
||||
test.AssertNotError(t, err, "dbMap.SelectInt failed")
|
||||
|
|
|
|||
|
|
@ -315,7 +315,7 @@ def get_future_output(cmd, date):
|
|||
|
||||
def test_expired_authz_purger():
|
||||
def expect(target_time, num, table):
|
||||
out = get_future_output("./bin/expired-authz-purger --config cmd/expired-authz-purger/config.json --yes", target_time)
|
||||
out = get_future_output("./bin/expired-authz-purger --config cmd/expired-authz-purger/config.json", target_time)
|
||||
if 'via FAKECLOCK' not in out:
|
||||
raise Exception("expired-authz-purger was not built with `integration` build tag")
|
||||
if num is None:
|
||||
|
|
|
|||
Loading…
Reference in New Issue