Add daemon checkpointing (#3863)

Updates #3840.
This commit is contained in:
Roland Bracewell Shoemaker 2018-09-21 12:51:11 -07:00 committed by Daniel McCarney
parent 0bfbab3bb4
commit ba1fb8b3c3
3 changed files with 120 additions and 24 deletions

View File

@ -8,6 +8,8 @@
"gracePeriod": "168h",
"batchSize": 1000,
"maxAuthzs": 10000,
"parallelism": 20
"parallelism": 20,
"pendingCheckpointFile": "/tmp/pending-checkpoint",
"finalCheckpointFile": "/tmp/final-checkpoint"
}
}

View File

@ -30,6 +30,14 @@ type eapConfig struct {
BatchSize int
MaxAuthzs int
Parallelism uint
// PendingCheckpointFile is the path to a file which is used to store the
// last pending authorization ID which was deleted. If path is to a file
// which does not exist it will be created.
PendingCheckpointFile string
// FinalCheckpointFile is the path to a file which is used to store the
// last authorization ID which was deleted. If path is to a file
// which does not exist it will be created.
FinalCheckpointFile string
Features map[string]bool
}
@ -43,6 +51,41 @@ type expiredAuthzPurger struct {
batchSize int64
}
// loadCheckpoint reads a string (which is assumed to be an authorization ID)
// from the file at the provided path and returns it to the caller. If the
// file does not exist an error is not returned and the returned ID is an
// empty string.
func loadCheckpoint(checkpointFile string) (string, error) {
content, err := ioutil.ReadFile(checkpointFile)
if err != nil {
if os.IsNotExist(err) {
return "", nil
}
return "", err
}
return string(content), nil
}
// saveCheckpoint atomically writes the provided ID to the provided file. The
// method os.Rename makes use of the renameat syscall to atomically replace
// one file with another. It creates a temporary file in a temporary directory
// before using os.Rename to replace the old file with the new one.
func saveCheckpoint(checkpointFile, id string) error {
tmpDir, err := ioutil.TempDir("", "checkpoint-tmp")
if err != nil {
return err
}
defer func() { _ = os.RemoveAll(tmpDir) }()
tmp, err := ioutil.TempFile(tmpDir, "checkpoint-atomic")
if err != nil {
return err
}
if _, err = tmp.Write([]byte(id)); err != nil {
return err
}
return os.Rename(tmp.Name(), checkpointFile)
}
// getWork selects a set of authorizations that expired before purgeBefore, bounded by batchSize,
// that have IDs that are more than initialID from either the pendingAuthorizations or authz tables
// and adds them to the work channel. It returns the last ID it selected and the number of IDs it
@ -68,7 +111,7 @@ func (p *expiredAuthzPurger) getWork(work chan string, query string, initialID s
var lastID string
for _, v := range idBatch {
work <- v
count += 1
count++
lastID = v
}
return lastID, count, nil
@ -88,7 +131,7 @@ func (p *expiredAuthzPurger) getWork(work chan string, query string, initialID s
// purge. If getWork returns the same ID that was passed to it then it will
// sleep a minute before looking for more authorizations again, starting at the
// same ID.
func (p *expiredAuthzPurger) purge(table string, purgeBefore time.Time, parallelism int, max int, daemon bool) error {
func (p *expiredAuthzPurger) purge(table string, purgeBefore time.Time, parallelism int, max int, daemon bool, checkpointFile string) error {
var query string
switch table {
case "pendingAuthorizations":
@ -97,10 +140,18 @@ func (p *expiredAuthzPurger) purge(table string, purgeBefore time.Time, parallel
query = "SELECT id FROM authz WHERE id >= :id AND expires <= :expires ORDER BY id LIMIT :limit"
}
// id starts as "", which is smaller than all other ids.
var id string
if checkpointFile != "" {
startID, err := loadCheckpoint(checkpointFile)
if err != nil {
return err
}
id = startID
}
work := make(chan string)
go func() {
// id starts as "", which is smaller than all other ids.
var id string
var count int
var working func() bool
@ -138,7 +189,15 @@ func (p *expiredAuthzPurger) purge(table string, purgeBefore time.Time, parallel
if err != nil {
p.log.AuditErrf("Deleting %s: %s", id, err)
}
atomic.AddInt64(&deleted, 1)
numDeleted := atomic.AddInt64(&deleted, 1)
// Only checkpoint every 1000 IDs in order to prevent unnecessary churn
// in the checkpoint file
if checkpointFile != "" && numDeleted%1000 == 0 {
err = saveCheckpoint(checkpointFile, id)
if err != nil {
p.log.AuditErrf("failed to checkpoint %q table at ID %q: %s", table, id, err)
}
}
}
}()
}
@ -170,18 +229,6 @@ func deleteAuthorization(db *gorp.DbMap, table, id string) error {
return nil
}
func (p *expiredAuthzPurger) purgeAuthzs(purgeBefore time.Time, parallelism int, max int, daemon bool) error {
// Purge authz first because it tends to be bigger and in more need of
// purging.
for _, table := range []string{"authz", "pendingAuthorizations"} {
err := p.purge(table, purgeBefore, parallelism, max, daemon)
if err != nil {
return err
}
}
return nil
}
func main() {
daemon := flag.Bool("daemon", false, "Runs the expired-authz-purger in daemon mode")
configPath := flag.String("config", "config.json", "Path to Boulder configuration file")
@ -228,7 +275,33 @@ func main() {
}
purgeBefore := purger.clk.Now().Add(-config.ExpiredAuthzPurger.GracePeriod.Duration)
logger.Info("Beginning purge")
err = purger.purgeAuthzs(purgeBefore, int(config.ExpiredAuthzPurger.Parallelism),
int(config.ExpiredAuthzPurger.MaxAuthzs), *daemon)
cmd.FailOnError(err, "Failed to purge authorizations")
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
defer wg.Done()
err := purger.purge(
"authz",
purgeBefore,
int(config.ExpiredAuthzPurger.Parallelism),
int(config.ExpiredAuthzPurger.MaxAuthzs),
*daemon,
config.ExpiredAuthzPurger.FinalCheckpointFile,
)
cmd.FailOnError(err, "Failed to purge authorizations")
}()
wg.Add(1)
go func() {
defer wg.Done()
err := purger.purge(
"pendingAuthorizations",
purgeBefore,
int(config.ExpiredAuthzPurger.Parallelism),
int(config.ExpiredAuthzPurger.MaxAuthzs),
*daemon,
config.ExpiredAuthzPurger.PendingCheckpointFile,
)
cmd.FailOnError(err, "Failed to purge authorizations")
}()
wg.Wait()
}

View File

@ -33,7 +33,14 @@ func TestPurgeAuthzs(t *testing.T) {
p := expiredAuthzPurger{log, fc, dbMap, 1}
err = p.purgeAuthzs(time.Time{}, 10, 100, false)
err = p.purge(
"pendingAuthorizations",
time.Time{},
10,
100,
false,
"",
)
test.AssertNotError(t, err, "purgeAuthzs failed")
old, new := fc.Now().Add(-time.Hour), fc.Now().Add(time.Hour)
@ -58,7 +65,14 @@ func TestPurgeAuthzs(t *testing.T) {
})
test.AssertNotError(t, err, "NewPendingAuthorization failed")
err = p.purgeAuthzs(fc.Now(), 10, 100, false)
err = p.purge(
"pendingAuthorizations",
fc.Now(),
10,
100,
false,
"",
)
test.AssertNotError(t, err, "purgeAuthzs failed")
count, err := dbMap.SelectInt("SELECT COUNT(1) FROM pendingAuthorizations")
test.AssertNotError(t, err, "dbMap.SelectInt failed")
@ -67,7 +81,14 @@ func TestPurgeAuthzs(t *testing.T) {
test.AssertNotError(t, err, "dbMap.SelectInt failed")
test.AssertEquals(t, count, int64(1))
err = p.purgeAuthzs(fc.Now().Add(time.Hour), 10, 100, false)
err = p.purge(
"pendingAuthorizations",
fc.Now().Add(time.Hour),
10,
100,
false,
"",
)
test.AssertNotError(t, err, "purgeAuthzs failed")
count, err = dbMap.SelectInt("SELECT COUNT(1) FROM pendingAuthorizations")
test.AssertNotError(t, err, "dbMap.SelectInt failed")