Delete boulder-janitor (#5571)
Delete the boulder-janitor binary, and the various configs and tests which exist to support it. This tool has not been actively running in quite some time. The tables which is covers are either supported by our more recent partitioning methods, or are rate-limit tables that we hope to move out of mysql entirely. The cost of maintaining the janitor is not offset by the benefits it brings us (or the lack thereof). Fixes #5569
This commit is contained in:
parent
36bca21e7e
commit
ac3e5e70c4
|
@ -1,66 +0,0 @@
|
|||
package janitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/letsencrypt/boulder/db"
|
||||
)
|
||||
|
||||
// deleteHandlers is a map of json-usable strings to actual functions, so that
|
||||
// configs can specify a delete handler by name.
|
||||
var deleteHandlers = map[string]func(*batchedDBJob, int64) error{
|
||||
"default": deleteDefault,
|
||||
"deleteOrder": deleteOrder,
|
||||
}
|
||||
|
||||
// deleteDefault performs a delete of the given ID from the batchedDBJob's
|
||||
// table or returns an error. It does not use a transaction and assumes there
|
||||
// are no foreign key constraints or referencing rows in other tables to manage.
|
||||
func deleteDefault(j *batchedDBJob, id int64) error {
|
||||
// NOTE(@cpu): We throw away the sql.Result here without checking the rows
|
||||
// affected because the query is always specific to the ID auto-increment
|
||||
// primary key. If there are multiple rows with the same primary key MariaDB
|
||||
// has failed us deeply.
|
||||
query := fmt.Sprintf(`DELETE FROM %s WHERE id = ?`, j.table)
|
||||
if _, err := j.db.Exec(query, id); err != nil {
|
||||
return err
|
||||
}
|
||||
j.log.Debugf("deleted ID %d in table %q", id, j.table)
|
||||
deletedStat.WithLabelValues(j.table).Inc()
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteOrder performs a delete of the given ID from the batchedDBJob's `orders`
|
||||
// table or returns an error. It also deletes corresponding rows from three
|
||||
// other tables which reference the `orders` table by foreign key.
|
||||
func deleteOrder(j *batchedDBJob, orderID int64) error {
|
||||
ctx := context.Background()
|
||||
// Perform a multi-table delete inside of a transaction using the order ID.
|
||||
// Either all of the rows associated with the order ID will be deleted or the
|
||||
// transaction will be rolled back.
|
||||
_, err := db.WithTransaction(ctx, j.db, func(txWithCtx db.Executor) (interface{}, error) {
|
||||
// Delete table rows in the childTables that reference the order being deleted.
|
||||
childTables := []string{"requestedNames", "orderFqdnSets", "orderToAuthz2"}
|
||||
for _, t := range childTables {
|
||||
query := fmt.Sprintf(`DELETE FROM %s WHERE orderID = ?`, t)
|
||||
res, err := txWithCtx.Exec(query, orderID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
affected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
deletedStat.WithLabelValues(t).Add(float64(affected))
|
||||
}
|
||||
// Finally delete the order itself
|
||||
if _, err := txWithCtx.Exec(`DELETE FROM orders WHERE id = ?`, orderID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
deletedStat.WithLabelValues("orders").Inc()
|
||||
j.log.Debugf("deleted order ID %d and associated rows", orderID)
|
||||
return nil, nil
|
||||
})
|
||||
return err
|
||||
}
|
|
@ -1,133 +0,0 @@
|
|||
package janitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
"github.com/letsencrypt/boulder/core"
|
||||
corepb "github.com/letsencrypt/boulder/core/proto"
|
||||
berrors "github.com/letsencrypt/boulder/errors"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
"github.com/letsencrypt/boulder/sa"
|
||||
sapb "github.com/letsencrypt/boulder/sa/proto"
|
||||
"github.com/letsencrypt/boulder/sa/satest"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
"github.com/letsencrypt/boulder/test/vars"
|
||||
)
|
||||
|
||||
func TestDeleteOrder(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
log, fc := setup()
|
||||
|
||||
// Create one dbMap for the SA with the SA user.
|
||||
dbMap, err := sa.NewDbMap(vars.DBConnSA, sa.DbSettings{})
|
||||
test.AssertNotError(t, err, "error creating db map")
|
||||
// Create a SSA backed by the SA user dbMap
|
||||
ssa, err := sa.NewSQLStorageAuthority(dbMap, dbMap, fc, log, metrics.NoopRegisterer, 1)
|
||||
test.AssertNotError(t, err, "error creating SA")
|
||||
|
||||
// Don't forget to cleanup!
|
||||
defer func() {
|
||||
test.ResetSATestDatabase(t)
|
||||
}()
|
||||
|
||||
// Create a test registration
|
||||
jwk, _ := satest.GoodJWK().MarshalJSON()
|
||||
initialIP, _ := net.ParseIP("127.0.0.1").MarshalText()
|
||||
reg, err := ssa.NewRegistration(ctx, &corepb.Registration{
|
||||
Key: jwk,
|
||||
InitialIP: initialIP,
|
||||
})
|
||||
test.AssertNotError(t, err, "error creating test registration")
|
||||
|
||||
// Create a test authorization
|
||||
expires := fc.Now().Add(time.Hour).UTC().UnixNano()
|
||||
authzA := &corepb.Authorization{
|
||||
Identifier: "test.example.com",
|
||||
RegistrationID: reg.Id,
|
||||
Status: string(core.StatusPending),
|
||||
Expires: expires,
|
||||
Challenges: []*corepb.Challenge{
|
||||
{
|
||||
Status: string(core.StatusPending),
|
||||
Type: string(core.ChallengeTypeDNS01),
|
||||
Token: "YXNkAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
|
||||
},
|
||||
},
|
||||
}
|
||||
req := &sapb.AddPendingAuthorizationsRequest{Authz: []*corepb.Authorization{authzA}}
|
||||
ids, err := ssa.NewAuthorizations2(context.Background(), req)
|
||||
test.AssertNotError(t, err, "error adding test authz2")
|
||||
test.AssertEquals(t, len(ids.Ids), 1)
|
||||
|
||||
// Create a test order referencing the test registration
|
||||
testOrder, err := ssa.NewOrder(ctx, &corepb.Order{
|
||||
RegistrationID: reg.Id,
|
||||
Status: string(core.StatusPending),
|
||||
Expires: expires,
|
||||
Names: []string{"test.example.com"},
|
||||
V2Authorizations: []int64{ids.Ids[0]},
|
||||
})
|
||||
test.AssertNotError(t, err, "error creating test order")
|
||||
|
||||
// Create a cleanup config for the Orders job
|
||||
config := JobConfig{
|
||||
Enabled: true,
|
||||
Table: "orders",
|
||||
ExpiresColumn: "expires",
|
||||
WorkSleep: cmd.ConfigDuration{Duration: time.Second},
|
||||
BatchSize: 1,
|
||||
MaxDPS: 1,
|
||||
Parallelism: 1,
|
||||
DeleteHandler: "deleteOrder",
|
||||
}
|
||||
|
||||
// Create a dbMap for the janitor user. We don't want to use the SA dbMap
|
||||
// because it doesn't have DELETE grants.
|
||||
// Create one dbMap for the SA with the SA user.
|
||||
janitorDbMap, err := sa.NewDbMap("janitor@tcp(boulder-mysql:3306)/boulder_sa_test", sa.DbSettings{})
|
||||
test.AssertNotError(t, err, "error creating db map")
|
||||
|
||||
// Create an Orders job
|
||||
j := newJob(config, janitorDbMap, log, fc)
|
||||
|
||||
// Delete the mock order by its ID
|
||||
err = j.deleteHandler(j, testOrder.Id)
|
||||
// It should not error
|
||||
test.AssertNotError(t, err, "error calling deleteHandler")
|
||||
|
||||
// The order should be gone
|
||||
_, err = ssa.GetOrder(ctx, &sapb.OrderRequest{Id: testOrder.Id})
|
||||
test.AssertError(t, err, "found order after deleting it")
|
||||
test.AssertErrorIs(t, err, berrors.NotFound)
|
||||
|
||||
// The orderToAuthz2 rows should be gone
|
||||
var authzIDs []int64
|
||||
_, err = janitorDbMap.Select(
|
||||
&authzIDs,
|
||||
"SELECT authzID FROM orderToAuthz2 WHERE orderID = ?;",
|
||||
testOrder.Id)
|
||||
test.AssertNotError(t, err, "error finding orderToAuthz2 rows")
|
||||
test.AssertEquals(t, len(authzIDs), 0)
|
||||
|
||||
// The requested names rows should be gone
|
||||
var requestedNamesIDs []int64
|
||||
_, err = janitorDbMap.Select(
|
||||
&requestedNamesIDs,
|
||||
"SELECT id FROM requestedNames WHERE orderID = ?;",
|
||||
testOrder.Id)
|
||||
test.AssertNotError(t, err, "error finding requestedNames rows")
|
||||
test.AssertEquals(t, len(requestedNamesIDs), 0)
|
||||
|
||||
// The orderFqdnSets rows should be gone
|
||||
var orderFqdnSetIDs []int64
|
||||
_, err = janitorDbMap.Select(
|
||||
&requestedNamesIDs,
|
||||
"SELECT id FROM orderFqdnSets WHERE orderID = ?;",
|
||||
testOrder.Id)
|
||||
test.AssertNotError(t, err, "error finding orderFqdnSets rows")
|
||||
test.AssertEquals(t, len(orderFqdnSetIDs), 0)
|
||||
}
|
|
@ -1,131 +0,0 @@
|
|||
package janitor
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/jmhodges/clock"
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
"github.com/letsencrypt/boulder/db"
|
||||
"github.com/letsencrypt/boulder/features"
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
"github.com/letsencrypt/boulder/sa"
|
||||
)
|
||||
|
||||
var (
|
||||
// errNoJobsConfigured is returned from New() when there are no jobs enabled
|
||||
// in the provided Config.
|
||||
errNoJobsConfigured = errors.New("no jobs enabled in configuration")
|
||||
)
|
||||
|
||||
// JanitorConfig is an exported type which can have json config values
|
||||
// marshalled into it. It is the input to New(), below.
|
||||
type JanitorConfig struct {
|
||||
// Syslog holds common syslog configuration.
|
||||
Syslog cmd.SyslogConfig
|
||||
// DebugAddr controls the bind address for prometheus metrics, etc.
|
||||
DebugAddr string
|
||||
// Features holds potential Feature flags.
|
||||
Features map[string]bool
|
||||
|
||||
// Common database connection configuration.
|
||||
DB cmd.DBConfig
|
||||
|
||||
// JobConfigs is a list of configs for individual cleanup jobs.
|
||||
JobConfigs []JobConfig
|
||||
}
|
||||
|
||||
// Janitor is a struct for a long-running cleanup daemon tasked with multiple
|
||||
// cleanup jobs.
|
||||
type Janitor struct {
|
||||
log blog.Logger
|
||||
clk clock.Clock
|
||||
db db.DatabaseMap
|
||||
jobs []*batchedDBJob
|
||||
}
|
||||
|
||||
// New creates a janitor instance from the provided configuration or errors. The
|
||||
// janitor will not be running until its Run() function is invoked.
|
||||
func New(clk clock.Clock, config JanitorConfig) (*Janitor, error) {
|
||||
if config.DebugAddr == "" {
|
||||
return nil, errors.New("metricsAddr must not be empty")
|
||||
}
|
||||
|
||||
// Enable configured feature flags
|
||||
if err := features.Set(config.Features); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Setup logging and stats
|
||||
scope, logger := cmd.StatsAndLogging(config.Syslog, config.DebugAddr)
|
||||
scope.MustRegister(errStat)
|
||||
scope.MustRegister(deletedStat)
|
||||
scope.MustRegister(workStat)
|
||||
defer logger.AuditPanic()
|
||||
logger.Info(cmd.VersionString())
|
||||
|
||||
// Create DB Map
|
||||
dbURL, err := config.DB.URL()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dbSettings := sa.DbSettings{
|
||||
MaxOpenConns: config.DB.MaxOpenConns,
|
||||
MaxIdleConns: config.DB.MaxIdleConns,
|
||||
ConnMaxLifetime: config.DB.ConnMaxLifetime.Duration,
|
||||
ConnMaxIdleTime: config.DB.ConnMaxIdleTime.Duration,
|
||||
}
|
||||
dbMap, err := sa.NewDbMap(dbURL, dbSettings)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sa.SetSQLDebug(dbMap, logger)
|
||||
|
||||
// Construct configured jobs
|
||||
jobs, err := newJobs(config.JobConfigs, dbMap, logger, clk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Janitor{
|
||||
log: logger,
|
||||
clk: clk,
|
||||
db: dbMap,
|
||||
jobs: jobs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// newJobs constructs a list of batchedDBJobs based on the provided config. If
|
||||
// no jobs are enabled in the config then errNoJobsConfigured is returned.
|
||||
func newJobs(configs []JobConfig, dbMap db.DatabaseMap, logger blog.Logger, clk clock.Clock) ([]*batchedDBJob, error) {
|
||||
var jobs []*batchedDBJob
|
||||
for _, c := range configs {
|
||||
j := newJob(c, dbMap, logger, clk)
|
||||
if j != nil {
|
||||
jobs = append(jobs, j)
|
||||
}
|
||||
}
|
||||
// There must be at least one job
|
||||
if len(jobs) == 0 {
|
||||
return nil, errNoJobsConfigured
|
||||
}
|
||||
// The jobs must all be valid
|
||||
for _, j := range jobs {
|
||||
if err := j.valid(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// Run starts the janitor daemon. Each configured job will start running in
|
||||
// dedicated go routines. The janitor will block on the completion of these
|
||||
// jobs (presently forever).
|
||||
func (j *Janitor) Run() {
|
||||
waitChan := make(chan bool)
|
||||
// Run each job
|
||||
for _, job := range j.jobs {
|
||||
go job.runForever()
|
||||
}
|
||||
// Wait forever
|
||||
<-waitChan
|
||||
}
|
|
@ -1,104 +0,0 @@
|
|||
package janitor
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/jmhodges/clock"
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
)
|
||||
|
||||
func TestNewJobs(t *testing.T) {
|
||||
onlyCertStatusConfig := `{
|
||||
"jobConfigs": [
|
||||
{
|
||||
"enabled": false,
|
||||
"table": "certificates"
|
||||
},
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "certificateStatus",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 1,
|
||||
"parallelism": 1
|
||||
},
|
||||
{
|
||||
"enabled": false,
|
||||
"table": "certificatesPerName"
|
||||
}
|
||||
]
|
||||
}`
|
||||
allConfig := `{
|
||||
"jobConfigs": [
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "certificates",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 1,
|
||||
"parallelism": 1
|
||||
},
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "certificateStatus",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 1,
|
||||
"parallelism": 1
|
||||
},
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "certificatesPerName",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 1,
|
||||
"parallelism": 1
|
||||
}
|
||||
]
|
||||
}`
|
||||
testCases := []struct {
|
||||
name string
|
||||
config string
|
||||
expectedTableJobs []string
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "no jobs enabled",
|
||||
config: `{}`,
|
||||
expectedError: errNoJobsConfigured,
|
||||
},
|
||||
{
|
||||
name: "only certificate status enabled",
|
||||
config: onlyCertStatusConfig,
|
||||
expectedTableJobs: []string{"certificateStatus"},
|
||||
},
|
||||
{
|
||||
name: "only certificates enabled",
|
||||
config: allConfig,
|
||||
expectedTableJobs: []string{"certificates", "certificateStatus", "certificatesPerName"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var config JanitorConfig
|
||||
err := json.Unmarshal([]byte(tc.config), &config)
|
||||
test.AssertNotError(t, err, "error unmarshaling tc Config")
|
||||
|
||||
jobs, err := newJobs(config.JobConfigs, nil, blog.UseMock(), clock.NewFake())
|
||||
fmt.Printf("For config %v got error %v\n", config.JobConfigs, err)
|
||||
test.AssertEquals(t, err, tc.expectedError)
|
||||
|
||||
var tableMap map[string]bool
|
||||
if err != nil {
|
||||
for _, j := range jobs {
|
||||
tableMap[j.table] = true
|
||||
}
|
||||
for _, expected := range tc.expectedTableJobs {
|
||||
if _, present := tableMap[expected]; !present {
|
||||
t.Errorf("expected batchedDBJob with table %q to be present", expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,303 +0,0 @@
|
|||
package janitor
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/jmhodges/clock"
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
"github.com/letsencrypt/boulder/db"
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
// minPurgeBefore is the smallest purgeBefore time.Duration that can be
|
||||
// configured for a job. We set this to 90 days to match the default validity
|
||||
// window of Let's Encrypt certificates.
|
||||
minPurgeBefore = time.Hour * 24 * 90
|
||||
)
|
||||
|
||||
var (
|
||||
// errStat is a prometheus counter vector tracking the number of errors
|
||||
// experienced by the janitor during operation sliced by a table label and a
|
||||
// type label. Examples of possible type labels include "getWork" and
|
||||
// "deleteResource".
|
||||
errStat = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "janitor_errors",
|
||||
Help: "Number of errors by type the boulder-janitor has experienced.",
|
||||
},
|
||||
[]string{"table", "type"})
|
||||
// deletedStat is a prometheus counter vector tracking the number of rows
|
||||
// deleted by the janitor, sliced by a table label.
|
||||
deletedStat = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "janitor_deletions",
|
||||
Help: "Number of deletions by table the boulder-janitor has performed.",
|
||||
},
|
||||
[]string{"table"})
|
||||
// workStat is a prometheus counter vector tracking the number of rows found
|
||||
// during a batchedJob's getWork stage and queued into the work channel sliced
|
||||
// by a table label.
|
||||
workStat = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "janitor_workbatch",
|
||||
Help: "Number of items of work by table the boulder-janitor queued for deletion.",
|
||||
},
|
||||
[]string{"table"})
|
||||
)
|
||||
|
||||
// JobConfig describes common configuration parameters shared by all cleanup
|
||||
// jobs.
|
||||
type JobConfig struct {
|
||||
// Enabled controls whether the janitor will run this cleanup job.
|
||||
Enabled bool
|
||||
// Table is the name of the table which this job will clean up.
|
||||
Table string
|
||||
// ExpiresColumn is the name of the column in `Table` containing expiration datetimes.
|
||||
ExpiresColumn string
|
||||
// GracePeriod controls when a resource is old enough to be cleaned up.
|
||||
GracePeriod cmd.ConfigDuration
|
||||
// WorkSleep controls how long the janitor's work threads sleep between
|
||||
// finding no work and trying again. Defaults to a minute if not provided.
|
||||
WorkSleep cmd.ConfigDuration
|
||||
// BatchSize controls how many rows of the resource will be read from the DB
|
||||
// per-query.
|
||||
BatchSize int64
|
||||
// Parallelism controls how many independent go routines will run Delete
|
||||
// statements for old resources being cleaned up.
|
||||
Parallelism int
|
||||
// MaxDPS controls the maximum number of deletes which will be performed
|
||||
// per second in total for the resource's table across all of the parallel go
|
||||
// routines for this resource. This can be used to reduce the replication lag
|
||||
// caused by creating a very large numbers of delete statements.
|
||||
MaxDPS int
|
||||
// DeleteHandler is the string name of a function (found in handlers.go) to
|
||||
// use to handle deletion of rows.
|
||||
DeleteHandler string
|
||||
}
|
||||
|
||||
// batchedDBJob is a struct abstracting the common properties of a long running
|
||||
// cleanup job based on cursoring across a database table's auto incrementing
|
||||
// primary key.
|
||||
type batchedDBJob struct {
|
||||
db db.DatabaseMap
|
||||
log blog.Logger
|
||||
clk clock.Clock
|
||||
// table is the name of the table that this job cleans up.
|
||||
table string
|
||||
// expiresColumn is the name of the column in `table` containing expiration datetimes.
|
||||
expiresColumn string
|
||||
// purgeBefore indicates the cut-off for the the resource being cleaned up by
|
||||
// the job. Rows that older than now - purgeBefore are deleted.
|
||||
purgeBefore time.Duration
|
||||
// workSleep is a duration that the job will sleep between getWork() calls
|
||||
// when no new work is found. If not provided, defaults to a minute.
|
||||
workSleep time.Duration
|
||||
// batchSize indicates how many database rows of work should be returned per query.
|
||||
batchSize int64
|
||||
// maxDPS optionally indicates a maximum rate of deletes to run per second.
|
||||
maxDPS int
|
||||
// parallelism controls how many independent go routines will be performing
|
||||
// cleanup deletes.
|
||||
parallelism int
|
||||
// deleteHandler is a function which will be called to handle deletions of
|
||||
// rows. The function must take a single int64 (the unique ID of the row to
|
||||
// be deleted) and return an error if deletion was unsuccessful. By default,
|
||||
// this should simply delete the single row in with the given ID in `table`.
|
||||
// More complex deletion logic may be necessary e.g. if there are other
|
||||
// tables with foreign keys which reference the given row.
|
||||
deleteHandler func(job *batchedDBJob, id int64) error
|
||||
}
|
||||
|
||||
func newJob(config JobConfig, dbMap db.DatabaseMap, log blog.Logger, clk clock.Clock) *batchedDBJob {
|
||||
if !config.Enabled {
|
||||
return nil
|
||||
}
|
||||
log.Debugf("Creating job from config: %#v", config)
|
||||
|
||||
expires := "expires"
|
||||
if config.ExpiresColumn != "" {
|
||||
expires = config.ExpiresColumn
|
||||
}
|
||||
|
||||
delete, ok := deleteHandlers[config.DeleteHandler]
|
||||
if !ok {
|
||||
delete = deleteDefault
|
||||
}
|
||||
|
||||
return &batchedDBJob{
|
||||
db: dbMap,
|
||||
log: log,
|
||||
clk: clk,
|
||||
table: config.Table,
|
||||
expiresColumn: expires,
|
||||
purgeBefore: config.GracePeriod.Duration,
|
||||
workSleep: config.WorkSleep.Duration,
|
||||
batchSize: config.BatchSize,
|
||||
maxDPS: config.MaxDPS,
|
||||
parallelism: config.Parallelism,
|
||||
deleteHandler: delete,
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
errNoTable = errors.New("table must not be empty")
|
||||
errNoPurgeBefore = fmt.Errorf("purgeBefore must be greater than %s", minPurgeBefore)
|
||||
errNoBatchSize = errors.New("batchSize must be > 0")
|
||||
errNoParallelism = errors.New("parallelism must be > 0")
|
||||
)
|
||||
|
||||
// valid checks that the batchedDBJob has all required fields set correctly and
|
||||
// returns an error if not satisfied.
|
||||
func (j *batchedDBJob) valid() error {
|
||||
if j.table == "" {
|
||||
return errNoTable
|
||||
}
|
||||
if j.purgeBefore <= minPurgeBefore {
|
||||
return errNoPurgeBefore
|
||||
}
|
||||
if j.batchSize <= 0 {
|
||||
return errNoBatchSize
|
||||
}
|
||||
if j.parallelism <= 0 {
|
||||
return errNoParallelism
|
||||
}
|
||||
// One strange special-case: Because certificatesPerName doesn't have a real
|
||||
// `expires` column, we use the grace period of 7 days to ensure that it
|
||||
// doesn't delete any rows that are still being used.
|
||||
if j.table == "certificatesPerName" && j.purgeBefore < time.Hour*24*7 {
|
||||
return errors.New("certificatesPerName GracePeriod must be more than 7 days")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type workUnit struct {
|
||||
ID int64
|
||||
Expires time.Time
|
||||
}
|
||||
|
||||
// getWork reads work into the provided work channel starting at the startID by
|
||||
// using the batchedDBJob's configured work query, purgeBefore, and batchSize.
|
||||
// If there is no error the last primary key ID written to the work channel will
|
||||
// be returned, otherwise an error result is returned.
|
||||
func (j batchedDBJob) getWork(work chan<- int64, startID int64) (int64, error) {
|
||||
var data []workUnit
|
||||
// This SQL query is used to find more work. It will be provided two parameters:
|
||||
// * :startID - the primary key value to start the work query from.
|
||||
// * :limit - the maximum number of rows to be returned by the query.
|
||||
// It will always return results with two columns:
|
||||
// * id - the primary key value for each row.
|
||||
// * expires - the expiry datetime used to calculate if the row is within the cutoff window.
|
||||
// Unfortunately, we have to interpolate the expires column name and the
|
||||
// table name ourselves, because you can't parameterize those fields in
|
||||
// prepared statements.
|
||||
workQuery := fmt.Sprintf(`
|
||||
SELECT id, %s AS expires
|
||||
FROM %s
|
||||
WHERE id > :startID
|
||||
LIMIT :limit`, j.expiresColumn, j.table)
|
||||
values := map[string]interface{}{
|
||||
"startID": startID,
|
||||
"limit": j.batchSize,
|
||||
}
|
||||
_, err := j.db.Select(&data, workQuery, values)
|
||||
if err != nil && !db.IsNoRows(err) {
|
||||
return 0, err
|
||||
}
|
||||
lastID := startID
|
||||
rows := 0
|
||||
cutoff := j.clk.Now().Add(-j.purgeBefore)
|
||||
for _, v := range data {
|
||||
// We check for the expiration in code rather than doing so in the
|
||||
// database query as it allows us to impose a bound on the number
|
||||
// of rows that the database will examine. If a row is return that
|
||||
// has an expiry after the cutoff all of the successive rows
|
||||
// should also have an expiry after the cutoff so we break from
|
||||
// the loop and ignore the rest of the results.
|
||||
if v.Expires.After(cutoff) {
|
||||
break
|
||||
}
|
||||
work <- v.ID
|
||||
rows++
|
||||
lastID = v.ID
|
||||
}
|
||||
workStat.WithLabelValues(j.table).Add(float64(rows))
|
||||
return lastID, nil
|
||||
}
|
||||
|
||||
// cleanResource uses the configured level of parallelism to run go routines
|
||||
// that read ID values from the work channel and delete the corresponding table
|
||||
// rows. If the batchedDBJob configures a maxDPS rate then it will be enforced by
|
||||
// synchronizing the delete operations on a ticker based on the maxDPS.
|
||||
// cleanResource will block until all of the worker go routines complete.
|
||||
func (j batchedDBJob) cleanResource(work <-chan int64) {
|
||||
wg := new(sync.WaitGroup)
|
||||
deleted := int64(0)
|
||||
|
||||
var ticker *time.Ticker
|
||||
if j.maxDPS > 0 {
|
||||
ticker = time.NewTicker(
|
||||
time.Duration(float64(time.Second) / float64(j.maxDPS)))
|
||||
}
|
||||
|
||||
for i := 0; i < j.parallelism; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for id := range work {
|
||||
if ticker != nil {
|
||||
<-ticker.C
|
||||
}
|
||||
if err := j.deleteHandler(&j, id); err != nil {
|
||||
j.log.Errf(
|
||||
"error deleting ID %d from table %q: %s",
|
||||
id, j.table, err)
|
||||
errStat.WithLabelValues(j.table, "deleteResource").Inc()
|
||||
}
|
||||
_ = atomic.AddInt64(&deleted, 1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
j.log.Infof(
|
||||
"deleted a total of %d rows from table %q",
|
||||
deleted, j.table)
|
||||
}
|
||||
|
||||
// RunForever starts a go routine that will run forever getting work with
|
||||
// getWork and deleting rows with cleanResource.
|
||||
func (j batchedDBJob) runForever() {
|
||||
var id int64
|
||||
work := make(chan int64)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
lastID, err := j.getWork(work, id)
|
||||
if err != nil {
|
||||
j.log.Errf("error getting work for %q from ID %d: %s",
|
||||
j.table, id, err.Error())
|
||||
errStat.WithLabelValues(j.table, "getWork").Inc()
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
continue
|
||||
} else if lastID == id {
|
||||
j.log.Debugf(
|
||||
"made no new progress on table %q. Sleeping for a minute",
|
||||
j.table)
|
||||
if j.workSleep.Seconds() == 0 {
|
||||
time.Sleep(time.Minute)
|
||||
} else {
|
||||
time.Sleep(j.workSleep)
|
||||
}
|
||||
}
|
||||
id = lastID
|
||||
}
|
||||
}()
|
||||
|
||||
j.cleanResource(work)
|
||||
}
|
|
@ -1,375 +0,0 @@
|
|||
package janitor
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jmhodges/clock"
|
||||
"github.com/letsencrypt/boulder/db"
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func setup() (*blog.Mock, clock.FakeClock) {
|
||||
return blog.UseMock(), clock.NewFake()
|
||||
}
|
||||
|
||||
type mockDB struct {
|
||||
t *testing.T
|
||||
expectedQuery string
|
||||
expectedArgMap map[string]interface{}
|
||||
selectResult []workUnit
|
||||
expectedExecArg int64
|
||||
execResult sql.Result
|
||||
errResult error
|
||||
}
|
||||
|
||||
func (m mockDB) Exec(query string, args ...interface{}) (sql.Result, error) {
|
||||
test.AssertEquals(m.t, query, m.expectedQuery)
|
||||
|
||||
if len(args) < 1 {
|
||||
m.t.Fatal("Exec() had no args")
|
||||
} else if idArg, ok := args[0].(int64); !ok {
|
||||
m.t.Fatalf("Select()'s args[0] was %T not int64", args[0])
|
||||
} else {
|
||||
test.AssertEquals(m.t, idArg, m.expectedExecArg)
|
||||
}
|
||||
|
||||
return m.execResult, m.errResult
|
||||
}
|
||||
|
||||
func (m mockDB) Select(result interface{}, query string, args ...interface{}) ([]interface{}, error) {
|
||||
test.AssertEquals(m.t, query, m.expectedQuery)
|
||||
|
||||
if len(args) < 1 {
|
||||
m.t.Fatal("Select() had no args")
|
||||
} else if argMap, ok := args[0].(map[string]interface{}); !ok {
|
||||
m.t.Fatalf("Select()'s args[0] was %T not map[string]interface{}", args[0])
|
||||
} else {
|
||||
test.AssertDeepEquals(m.t, argMap, m.expectedArgMap)
|
||||
}
|
||||
|
||||
if idResults, ok := result.(*[]workUnit); !ok {
|
||||
m.t.Fatalf("Select()'s result target pointer was %T not []int64", result)
|
||||
} else {
|
||||
*idResults = append(*idResults, m.selectResult...)
|
||||
}
|
||||
|
||||
return nil, m.errResult
|
||||
}
|
||||
|
||||
func (m mockDB) SelectOne(interface{}, string, ...interface{}) error {
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (m mockDB) Insert(...interface{}) error {
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (m mockDB) Begin() (db.Transaction, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func TestGetWork(t *testing.T) {
|
||||
log, clk := setup()
|
||||
startID := int64(10)
|
||||
table := "certificates"
|
||||
clk.Add(time.Hour * 5)
|
||||
resultsExpires := clk.Now().Add(-time.Hour * 2)
|
||||
batchSize := int64(20)
|
||||
workQuery := `
|
||||
SELECT id, expires AS expires
|
||||
FROM certificates
|
||||
WHERE id > :startID
|
||||
LIMIT :limit`
|
||||
mockIDs := []workUnit{
|
||||
{1, resultsExpires},
|
||||
{2, resultsExpires},
|
||||
{3, resultsExpires},
|
||||
{10, resultsExpires},
|
||||
{90, resultsExpires},
|
||||
}
|
||||
|
||||
testDB := &mockDB{
|
||||
t: t,
|
||||
expectedQuery: workQuery,
|
||||
expectedArgMap: map[string]interface{}{
|
||||
"startID": startID,
|
||||
"limit": batchSize,
|
||||
},
|
||||
}
|
||||
|
||||
workChan := make(chan int64, 5)
|
||||
|
||||
job := &batchedDBJob{
|
||||
db: testDB,
|
||||
log: log,
|
||||
clk: clk,
|
||||
table: table,
|
||||
expiresColumn: "expires",
|
||||
purgeBefore: time.Hour,
|
||||
batchSize: batchSize,
|
||||
}
|
||||
|
||||
// Mock Select() to return a non-nil error result
|
||||
testDB.errResult = errors.New("database is on vacation")
|
||||
_, err := job.getWork(workChan, startID)
|
||||
// We expect to get back an error
|
||||
test.AssertError(t, err, "no error returned from getWork with bad DB")
|
||||
|
||||
// Mock Select() to return good results and a nil error
|
||||
testDB.errResult = nil
|
||||
testDB.selectResult = mockIDs
|
||||
|
||||
// We expect to get back no error and the correct lastID
|
||||
lastID, err := job.getWork(workChan, startID)
|
||||
test.AssertNotError(t, err, "unexpected error from getWork")
|
||||
test.AssertEquals(t, lastID, mockIDs[len(mockIDs)-1].ID)
|
||||
|
||||
// We should be able to read one item per mockID and it should match the expected ID
|
||||
for i := 0; i < len(mockIDs); i++ {
|
||||
got := <-workChan
|
||||
test.AssertEquals(t, got, mockIDs[i].ID)
|
||||
}
|
||||
|
||||
// We expect the work gauge for this table has been updated
|
||||
test.AssertMetricWithLabelsEquals(
|
||||
t, workStat, prometheus.Labels{"table": table}, float64(len(mockIDs)))
|
||||
|
||||
// Set the third item in mockIDs to have an expiry after the purge cutoff
|
||||
// so we expect to only get the first two items returned from getWork
|
||||
testDB.selectResult[2].Expires = clk.Now()
|
||||
workStat.Reset()
|
||||
|
||||
// We expect to get back no error and the correct lastID
|
||||
lastID, err = job.getWork(workChan, startID)
|
||||
test.AssertNotError(t, err, "unexpected error from getWork")
|
||||
test.AssertEquals(t, lastID, testDB.selectResult[1].ID)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
got := <-workChan
|
||||
test.AssertEquals(t, got, mockIDs[i].ID)
|
||||
}
|
||||
test.AssertMetricWithLabelsEquals(
|
||||
t, workStat, prometheus.Labels{"table": table}, 2)
|
||||
}
|
||||
|
||||
func TestDeleteResource(t *testing.T) {
|
||||
log, _ := setup()
|
||||
table := "certificates"
|
||||
|
||||
testID := int64(1)
|
||||
|
||||
testDB := &mockDB{
|
||||
t: t,
|
||||
expectedQuery: "DELETE FROM certificates WHERE id = ?",
|
||||
expectedExecArg: testID,
|
||||
}
|
||||
|
||||
// create a batchedDBJob with the simpleResourceDelete function as the
|
||||
// deleteHandler
|
||||
job := &batchedDBJob{
|
||||
db: testDB,
|
||||
log: log,
|
||||
table: table,
|
||||
expiresColumn: "expires",
|
||||
deleteHandler: deleteDefault,
|
||||
}
|
||||
|
||||
// Mock Exec() to return a non-nil error result
|
||||
testDB.errResult = errors.New("database is on vacation")
|
||||
err := job.deleteHandler(job, testID)
|
||||
// We expect an err result back
|
||||
test.AssertError(t, err, "no error returned from deleteHandler with bad DB")
|
||||
// We expect no deletes to have been tracked in the deletedStat
|
||||
test.AssertMetricWithLabelsEquals(
|
||||
t, deletedStat, prometheus.Labels{"table": "certificates"}, 0)
|
||||
|
||||
// With the mock error removed we expect no error returned from simpleDeleteResource
|
||||
testDB.errResult = nil
|
||||
err = job.deleteHandler(job, testID)
|
||||
test.AssertNotError(t, err, "unexpected error from deleteHandler")
|
||||
// We expect a delete to have been tracked in the deletedStat
|
||||
test.AssertMetricWithLabelsEquals(
|
||||
t, deletedStat, prometheus.Labels{"table": "certificates"}, 1)
|
||||
}
|
||||
|
||||
type slowDB struct{}
|
||||
|
||||
func (db slowDB) Exec(_ string, _ ...interface{}) (sql.Result, error) {
|
||||
time.Sleep(time.Second)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (db slowDB) Select(result interface{}, _ string, _ ...interface{}) ([]interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (db slowDB) SelectOne(interface{}, string, ...interface{}) error {
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (db slowDB) Insert(...interface{}) error {
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (db slowDB) Begin() (db.Transaction, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func TestCleanResource(t *testing.T) {
|
||||
log, _ := setup()
|
||||
|
||||
// Use a DB that always sleeps for 1 second for each Exec()'d delete.
|
||||
db := slowDB{}
|
||||
|
||||
job := batchedDBJob{
|
||||
db: db,
|
||||
log: log,
|
||||
table: "example",
|
||||
expiresColumn: "expires",
|
||||
// Start with a parallelism of 1
|
||||
parallelism: 1,
|
||||
deleteHandler: deleteDefault,
|
||||
}
|
||||
|
||||
busyWork := func() <-chan int64 {
|
||||
work := make(chan int64, 2)
|
||||
work <- 1
|
||||
work <- 2
|
||||
close(work)
|
||||
return work
|
||||
}
|
||||
|
||||
// Create some work without blocking the test go routine
|
||||
work := busyWork()
|
||||
|
||||
// Run cleanResource and track the elapsed time
|
||||
start := time.Now()
|
||||
job.cleanResource(work)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// With a parallelism of 1 and a sleep of 1 second per delete it should take
|
||||
// more than 1 second to delete both IDs in the work channel
|
||||
test.Assert(t,
|
||||
elapsed >= time.Second,
|
||||
fmt.Sprintf("expected parallelism of 1 to take longer than 1 second to delete two rows, took %s", elapsed))
|
||||
|
||||
// Both rows should have been deleted
|
||||
expectedLog := `deleted a total of 2 rows from table "example"`
|
||||
matches := log.GetAllMatching(expectedLog)
|
||||
test.AssertEquals(t, len(matches), 1)
|
||||
|
||||
// Increase the parallelism
|
||||
job.parallelism = 2
|
||||
// Recreate the work channel
|
||||
work = busyWork()
|
||||
// Clear the log
|
||||
log.Clear()
|
||||
|
||||
// Run cleanResource again and track the elapsed time
|
||||
start = time.Now()
|
||||
job.cleanResource(work)
|
||||
elapsed = time.Since(start)
|
||||
|
||||
// With a parallelism of 2 and a sleep of 1 second per delete it should take
|
||||
// less than 1 second to delete both IDs in the work channel
|
||||
test.Assert(t,
|
||||
elapsed <= time.Second+(time.Millisecond*500),
|
||||
fmt.Sprintf("expected parallelism of 2 to take less than 1 second to delete two rows, took %s", elapsed))
|
||||
|
||||
// Both rows should have been deleted
|
||||
matches = log.GetAllMatching(expectedLog)
|
||||
test.AssertEquals(t, len(matches), 1)
|
||||
|
||||
// Introduce a low max DPS to the job
|
||||
job.maxDPS = 1
|
||||
// Recreate the work channel
|
||||
work = busyWork()
|
||||
// Clear the log
|
||||
log.Clear()
|
||||
|
||||
// Run cleanResource again and track the elapsed time
|
||||
start = time.Now()
|
||||
job.cleanResource(work)
|
||||
elapsed = time.Since(start)
|
||||
|
||||
// With the maxDPS of 1 the parallelism of 2 should be limited such that it
|
||||
// will take more than 1 second to delete both IDs in the work channel once
|
||||
// again.
|
||||
test.Assert(t,
|
||||
elapsed >= time.Second,
|
||||
fmt.Sprintf("expected parallelism of 2 with max DPS 1 to take longer than 1 second to delete two rows, took %s", elapsed))
|
||||
|
||||
// Both rows should have been deleted
|
||||
matches = log.GetAllMatching(expectedLog)
|
||||
test.AssertEquals(t, len(matches), 1)
|
||||
}
|
||||
|
||||
func TestBatchedDBJobValid(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
j batchedDBJob
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "no table",
|
||||
j: batchedDBJob{},
|
||||
expectedErr: errNoTable,
|
||||
},
|
||||
{
|
||||
name: "no purgeBefore",
|
||||
j: batchedDBJob{
|
||||
table: "chef's",
|
||||
},
|
||||
expectedErr: errNoPurgeBefore,
|
||||
},
|
||||
{
|
||||
name: "too small purgeBefore",
|
||||
j: batchedDBJob{
|
||||
table: "chef's",
|
||||
purgeBefore: minPurgeBefore,
|
||||
},
|
||||
expectedErr: errNoPurgeBefore,
|
||||
},
|
||||
{
|
||||
name: "no batchSize",
|
||||
j: batchedDBJob{
|
||||
table: "chef's",
|
||||
purgeBefore: minPurgeBefore + time.Hour,
|
||||
},
|
||||
expectedErr: errNoBatchSize,
|
||||
},
|
||||
{
|
||||
name: "no parallelism",
|
||||
j: batchedDBJob{
|
||||
table: "chef's",
|
||||
purgeBefore: minPurgeBefore + time.Hour,
|
||||
batchSize: 1,
|
||||
},
|
||||
expectedErr: errNoParallelism,
|
||||
},
|
||||
{
|
||||
name: "valid",
|
||||
j: batchedDBJob{
|
||||
table: "chef's",
|
||||
expiresColumn: "kitchen",
|
||||
purgeBefore: time.Hour * 24 * 91,
|
||||
batchSize: 1,
|
||||
parallelism: 1,
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
err := tc.j.valid()
|
||||
test.AssertEquals(t, err, tc.expectedErr)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
"github.com/letsencrypt/boulder/cmd/boulder-janitor/janitor"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Janitor janitor.JanitorConfig
|
||||
}
|
||||
|
||||
func main() {
|
||||
configPath := flag.String("config", "config.json", "Path to boulder-janitor configuration file")
|
||||
flag.Parse()
|
||||
|
||||
configJSON, err := ioutil.ReadFile(*configPath)
|
||||
cmd.FailOnError(err, "Failed to read config file")
|
||||
|
||||
var config Config
|
||||
err = json.Unmarshal(configJSON, &config)
|
||||
cmd.FailOnError(err, "Failed to parse JSON config")
|
||||
|
||||
j, err := janitor.New(cmd.Clock(), config.Janitor)
|
||||
cmd.FailOnError(err, "Failed to build janitor with config")
|
||||
|
||||
j.Run()
|
||||
}
|
|
@ -1,63 +0,0 @@
|
|||
{
|
||||
"janitor": {
|
||||
"syslog": {
|
||||
"stdoutLevel": 6
|
||||
},
|
||||
"db": {
|
||||
"dbConnectFile": "test/secrets/janitor_dburl",
|
||||
"maxOpenConns": 10
|
||||
},
|
||||
"debugAddr": ":8014",
|
||||
"jobConfigs": [
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "certificates",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 100,
|
||||
"workSleep": "500ms",
|
||||
"parallelism": 2,
|
||||
"maxDPS": 50
|
||||
},
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "certificateStatus",
|
||||
"expiresColumn": "notAfter",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 100,
|
||||
"workSleep": "500ms",
|
||||
"parallelism": 2,
|
||||
"maxDPS": 50
|
||||
},
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "certificatesPerName",
|
||||
"expiresColumn": "time",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 100,
|
||||
"workSleep": "500ms",
|
||||
"parallelism": 2,
|
||||
"maxDPS": 50
|
||||
},
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "keyHashToSerial",
|
||||
"expiresColumn": "certNotAfter",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 100,
|
||||
"workSleep": "500ms",
|
||||
"parallelism": 2,
|
||||
"maxDPS": 50
|
||||
},
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "orders",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 100,
|
||||
"workSleep": "500ms",
|
||||
"parallelism": 2,
|
||||
"maxDPS": 50,
|
||||
"deleteHandler": "deleteOrder"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
|
@ -1,63 +0,0 @@
|
|||
{
|
||||
"janitor": {
|
||||
"syslog": {
|
||||
"stdoutLevel": 6
|
||||
},
|
||||
"db": {
|
||||
"dbConnectFile": "test/secrets/janitor_dburl",
|
||||
"maxOpenConns": 10
|
||||
},
|
||||
"debugAddr": ":8014",
|
||||
"jobConfigs": [
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "certificates",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 100,
|
||||
"workSleep": "500ms",
|
||||
"parallelism": 2,
|
||||
"maxDPS": 50
|
||||
},
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "certificateStatus",
|
||||
"expiresColumn": "notAfter",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 100,
|
||||
"workSleep": "500ms",
|
||||
"parallelism": 2,
|
||||
"maxDPS": 50
|
||||
},
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "certificatesPerName",
|
||||
"expiresColumn": "time",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 100,
|
||||
"workSleep": "500ms",
|
||||
"parallelism": 2,
|
||||
"maxDPS": 50
|
||||
},
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "keyHashToSerial",
|
||||
"expiresColumn": "certNotAfter",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 100,
|
||||
"workSleep": "500ms",
|
||||
"parallelism": 2,
|
||||
"maxDPS": 50
|
||||
},
|
||||
{
|
||||
"enabled": true,
|
||||
"table": "orders",
|
||||
"gracePeriod": "2184h",
|
||||
"batchSize": 100,
|
||||
"workSleep": "500ms",
|
||||
"parallelism": 2,
|
||||
"maxDPS": 50,
|
||||
"deleteHandler": "deleteOrder"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
|
@ -87,101 +87,6 @@ def run_expired_authz_purger():
|
|||
expect(now, 0, "authz")
|
||||
expect(after_grace_period, 1, "authz")
|
||||
|
||||
def run_janitor():
|
||||
# Set the fake clock to a year in the future such that all of the database
|
||||
# rows created during the integration tests are older than the grace period.
|
||||
now = datetime.datetime.utcnow()
|
||||
target_time = now+datetime.timedelta(days=+365)
|
||||
|
||||
e = os.environ.copy()
|
||||
e.setdefault("GORACE", "halt_on_error=1")
|
||||
e.setdefault("FAKECLOCK", fakeclock(target_time))
|
||||
|
||||
cmdline = ["./bin/boulder-janitor", "--config", "{0}/janitor.json".format(config_dir)]
|
||||
p = subprocess.Popen(cmdline, env=e)
|
||||
|
||||
# Wait for the janitor to come up
|
||||
waitport(8014, "boulder-janitor", None)
|
||||
|
||||
def statline(statname, table):
|
||||
# NOTE: we omit the trailing "}}" to make this match general enough to
|
||||
# permit new labels in the future.
|
||||
return "janitor_{0}{{table=\"{1}\"".format(statname, table)
|
||||
|
||||
def get_stat_line(port, stat):
|
||||
url = "http://localhost:%d/metrics" % port
|
||||
response = requests.get(url)
|
||||
for l in response.text.split("\n"):
|
||||
if l.strip().startswith(stat):
|
||||
return l
|
||||
return None
|
||||
|
||||
def stat_value(line):
|
||||
parts = line.split(" ")
|
||||
if len(parts) != 2:
|
||||
raise(Exception("stat line {0} was missing required parts".format(line)))
|
||||
return parts[1]
|
||||
|
||||
# Wait for the janitor to finish its work. The easiest way to tell this
|
||||
# externally is to watch for the work batch counters to stabilize for
|
||||
# a period longer than the configured workSleep.
|
||||
attempts = 0
|
||||
while True:
|
||||
if attempts > 5:
|
||||
raise(Exception("timed out waiting for janitor workbatch counts to stabilize"))
|
||||
|
||||
certStatusWorkBatch = get_stat_line(8014, statline("workbatch", "certificateStatus"))
|
||||
certsWorkBatch = get_stat_line(8014, statline("workbatch", "certificates"))
|
||||
certsPerNameWorkBatch = get_stat_line(8014, statline("workbatch", "certificatesPerName"))
|
||||
ordersWorkBatch = get_stat_line(8014, statline("workbatch", "orders"))
|
||||
|
||||
# sleep for double the configured workSleep for each job
|
||||
time.sleep(1)
|
||||
|
||||
newCertStatusWorkBatch = get_stat_line(8014, statline("workbatch", "certificateStatus"))
|
||||
newCertsWorkBatch = get_stat_line(8014, statline("workbatch", "certificates"))
|
||||
newCertsPerNameWorkBatch = get_stat_line(8014, statline("workbatch", "certificatesPerName"))
|
||||
newOrdersWorkBatch = get_stat_line(8014, statline("workbatch", "orders"))
|
||||
|
||||
if (certStatusWorkBatch == newCertStatusWorkBatch
|
||||
and certsWorkBatch == newCertsWorkBatch
|
||||
and certsPerNameWorkBatch == newCertsPerNameWorkBatch
|
||||
and ordersWorkBatch == newOrdersWorkBatch):
|
||||
break
|
||||
|
||||
attempts = attempts + 1
|
||||
|
||||
# Check deletion stats are not empty/zero
|
||||
for i in range(10):
|
||||
certStatusDeletes = get_stat_line(8014, statline("deletions", "certificateStatus"))
|
||||
certsDeletes = get_stat_line(8014, statline("deletions", "certificates"))
|
||||
certsPerNameDeletes = get_stat_line(8014, statline("deletions", "certificatesPerName"))
|
||||
ordersDeletes = get_stat_line(8014, statline("deletions", "orders"))
|
||||
|
||||
if certStatusDeletes is None or certsDeletes is None or certsPerNameDeletes is None or ordersDeletes is None:
|
||||
print("delete stats not present after check {0}. Sleeping".format(i))
|
||||
time.sleep(2)
|
||||
continue
|
||||
|
||||
for l in [certStatusDeletes, certsDeletes, certsPerNameDeletes, ordersDeletes]:
|
||||
if stat_value(l) == "0":
|
||||
raise(Exception("Expected a non-zero number of deletes to be performed. Found {0}".format(l)))
|
||||
|
||||
# Check that all error stats are empty
|
||||
errorStats = [
|
||||
statline("errors", "certificateStatus"),
|
||||
statline("errors", "certificates"),
|
||||
statline("errors", "certificatesPerName"),
|
||||
statline("errors", "orders"),
|
||||
]
|
||||
for eStat in errorStats:
|
||||
actual = get_stat_line(8014, eStat)
|
||||
if actual is not None:
|
||||
raise(Exception("Expected to find no error stat lines but found {0}\n".format(eStat)))
|
||||
|
||||
# Terminate the janitor
|
||||
p.terminate()
|
||||
|
||||
def test_single_ocsp():
|
||||
"""Run ocsp-responder with the single OCSP response generated for the intermediate
|
||||
certificate using the ceremony tool during setup and check that it successfully
|
||||
|
@ -278,11 +183,6 @@ def main():
|
|||
if not CONFIG_NEXT:
|
||||
run_expired_authz_purger()
|
||||
|
||||
# Run the boulder-janitor. This should happen after all other tests because
|
||||
# it runs with the fake clock set to the future and deletes rows that may
|
||||
# otherwise be referenced by tests.
|
||||
run_janitor()
|
||||
|
||||
# Run the load-generator last. run_loadtest will stop the
|
||||
# pebble-challtestsrv before running the load-generator and will not restart
|
||||
# it.
|
||||
|
|
|
@ -14,7 +14,6 @@ CREATE USER IF NOT EXISTS 'ocsp_update'@'localhost';
|
|||
CREATE USER IF NOT EXISTS 'ocsp_update_ro'@'localhost';
|
||||
CREATE USER IF NOT EXISTS 'test_setup'@'localhost';
|
||||
CREATE USER IF NOT EXISTS 'purger'@'localhost';
|
||||
CREATE USER IF NOT EXISTS 'janitor'@'localhost';
|
||||
CREATE USER IF NOT EXISTS 'badkeyrevoker'@'localhost';
|
||||
|
||||
-- Storage Authority
|
||||
|
@ -79,16 +78,6 @@ GRANT SELECT ON certificates TO 'cert_checker'@'localhost';
|
|||
-- Expired authorization purger
|
||||
GRANT SELECT,DELETE ON authz2 TO 'purger'@'localhost';
|
||||
|
||||
-- Janitor
|
||||
GRANT SELECT,DELETE ON certificates TO 'janitor'@'localhost';
|
||||
GRANT SELECT,DELETE ON certificateStatus TO 'janitor'@'localhost';
|
||||
GRANT SELECT,DELETE ON certificatesPerName TO 'janitor'@'localhost';
|
||||
GRANT SELECT,DELETE ON keyHashToSerial TO 'janitor'@'localhost';
|
||||
GRANT SELECT,DELETE ON orders TO 'janitor'@'localhost';
|
||||
GRANT SELECT,DELETE ON requestedNames TO 'janitor'@'localhost';
|
||||
GRANT SELECT,DELETE ON orderFqdnSets TO 'janitor'@'localhost';
|
||||
GRANT SELECT,DELETE ON orderToAuthz2 TO 'janitor'@'localhost';
|
||||
|
||||
-- Bad Key Revoker
|
||||
GRANT SELECT,UPDATE ON blockedKeys TO 'badkeyrevoker'@'localhost';
|
||||
GRANT SELECT ON keyHashToSerial TO 'badkeyrevoker'@'localhost';
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
janitor@tcp(boulder-mysql:3306)/boulder_sa_integration
|
Loading…
Reference in New Issue