Refactor ocsp_updater into a separate package (#5805)

This pulls most of ocsp_updater into its own package. This follows more
closely the pattern we use elsewhere, where we have a struct for the
service, and a main.go that sets it up.

Also splits out the JSON config into a third package,
`ocsp_updater/config`, to avoid circular dependency issues.

The only changes to the implementation of OCSPUpdater are renaming
`newUpdater` to `ocsp_updater.New`, and `tick` to `Tick`. Also moved
some testdata into `ocsp_updater/testdata/` and updated the tests
appropriately. And changed some `configureDb` calls in the unittests to
use sa.NewDbMap instead.
This commit is contained in:
Jacob Hoffman-Andrews 2021-11-18 16:51:28 -08:00 committed by GitHub
parent dfc3ea434f
commit 831f9d89f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 500 additions and 495 deletions

View File

@ -1,485 +1,31 @@
package notmain
import (
"context"
"database/sql"
"errors"
"flag"
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/go-sql-driver/mysql"
"github.com/honeycombio/beeline-go"
"github.com/jmhodges/clock"
"github.com/prometheus/client_golang/prometheus"
capb "github.com/letsencrypt/boulder/ca/proto"
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/core"
"github.com/letsencrypt/boulder/features"
bgrpc "github.com/letsencrypt/boulder/grpc"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/ocsp_updater"
ocsp_updater_config "github.com/letsencrypt/boulder/ocsp_updater/config"
"github.com/letsencrypt/boulder/sa"
)
// ocspDB and ocspReadOnlyDB are interfaces collecting the `sql.DB` methods that
// the various parts of OCSPUpdater rely on. Using this adapter shim allows tests to
// swap out the `sql.DB` implementation.
// ocspReadOnlyDb provides only read-only portions of the `sql.DB` interface.
type ocspReadOnlyDb interface {
Query(query string, args ...interface{}) (*sql.Rows, error)
}
// ocspDb provides read-write portions of the `sql.DB` interface.
type ocspDb interface {
ocspReadOnlyDb
Exec(query string, args ...interface{}) (sql.Result, error)
}
// failCounter provides a concurrent safe counter.
type failCounter struct {
mu sync.Mutex
count int
}
func (c *failCounter) Add(i int) {
c.mu.Lock()
defer c.mu.Unlock()
c.count += i
}
func (c *failCounter) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
c.count = 0
}
func (c *failCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
// OCSPUpdater contains the useful objects for the Updater
type OCSPUpdater struct {
log blog.Logger
clk clock.Clock
db ocspDb
readOnlyDb ocspReadOnlyDb
ogc capb.OCSPGeneratorClient
tickWindow time.Duration
batchSize int
tickHistogram *prometheus.HistogramVec
maxBackoff time.Duration
backoffFactor float64
readFailures failCounter
serialSuffixes []string
queryBody string
// Used to calculate how far back stale OCSP responses should be looked for
ocspMinTimeToExpiry time.Duration
// Maximum number of individual OCSP updates to attempt in parallel. Making
// these requests in parallel allows us to get higher total throughput.
parallelGenerateOCSPRequests int
stalenessHistogram prometheus.Histogram
genStoreHistogram prometheus.Histogram
generatedCounter *prometheus.CounterVec
storedCounter *prometheus.CounterVec
markExpiredCounter *prometheus.CounterVec
findStaleOCSPCounter *prometheus.CounterVec
}
func newUpdater(
stats prometheus.Registerer,
clk clock.Clock,
db ocspDb,
readOnlyDb ocspReadOnlyDb,
serialSuffixes []string,
ogc capb.OCSPGeneratorClient,
config OCSPUpdaterConfig,
log blog.Logger,
) (*OCSPUpdater, error) {
if config.OldOCSPBatchSize == 0 {
return nil, fmt.Errorf("Loop batch sizes must be non-zero")
}
if config.OldOCSPWindow.Duration == 0 {
return nil, fmt.Errorf("Loop window sizes must be non-zero")
}
if config.ParallelGenerateOCSPRequests == 0 {
// Default to 1
config.ParallelGenerateOCSPRequests = 1
}
for _, s := range serialSuffixes {
if len(s) != 1 || strings.ToLower(s) != s {
return nil, fmt.Errorf("serial suffixes must all be one lowercase character, got %q, expected %q", s, strings.ToLower(s))
}
c := s[0]
if !(c >= '0' && c <= '9' || c >= 'a' && c <= 'f') {
return nil, errors.New("valid range for suffixes is [0-9a-f]")
}
}
var queryBody strings.Builder
queryBody.WriteString("WHERE ocspLastUpdated < ? AND NOT isExpired ")
if len(serialSuffixes) > 0 {
fmt.Fprintf(&queryBody, "AND RIGHT(serial, 1) IN ( %s ) ",
getQuestionsForShardList(len(serialSuffixes)),
)
}
queryBody.WriteString("ORDER BY ocspLastUpdated ASC LIMIT ?")
genStoreHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "ocsp_updater_generate_and_store",
Help: "A histogram of latencies of OCSP generation and storage latencies",
})
stats.MustRegister(genStoreHistogram)
generatedCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ocsp_updater_generated",
Help: "A counter of OCSP response generation calls labeled by result",
}, []string{"result"})
stats.MustRegister(generatedCounter)
storedCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ocsp_updater_stored",
Help: "A counter of OCSP response storage calls labeled by result",
}, []string{"result"})
stats.MustRegister(storedCounter)
tickHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "ocsp_updater_ticks",
Help: "A histogram of ocsp-updater tick latencies labelled by result and whether the tick was considered longer than expected",
Buckets: []float64{0.01, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000},
}, []string{"result", "long"})
stats.MustRegister(tickHistogram)
stalenessHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "ocsp_status_staleness",
Help: "How long past the refresh time a status is when we try to refresh it. Will always be > 0, but must stay well below 12 hours.",
Buckets: []float64{10, 100, 1000, 10000, 21600, 32400, 36000, 39600, 43200, 54000, 64800, 75600, 86400, 108000, 129600, 172800},
})
stats.MustRegister(stalenessHistogram)
markExpiredCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "mark_expired",
Help: "A counter of mark expired calls labeled by result",
}, []string{"result"})
stats.MustRegister(markExpiredCounter)
findStaleOCSPCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "find_stale_ocsp",
Help: "A counter of query for stale OCSP responses labeled by result",
}, []string{"result"})
stats.MustRegister(findStaleOCSPCounter)
updater := OCSPUpdater{
clk: clk,
db: db,
readOnlyDb: readOnlyDb,
ogc: ogc,
log: log,
ocspMinTimeToExpiry: config.OCSPMinTimeToExpiry.Duration,
parallelGenerateOCSPRequests: config.ParallelGenerateOCSPRequests,
genStoreHistogram: genStoreHistogram,
generatedCounter: generatedCounter,
storedCounter: storedCounter,
markExpiredCounter: markExpiredCounter,
findStaleOCSPCounter: findStaleOCSPCounter,
stalenessHistogram: stalenessHistogram,
tickHistogram: tickHistogram,
tickWindow: config.OldOCSPWindow.Duration,
batchSize: config.OldOCSPBatchSize,
maxBackoff: config.SignFailureBackoffMax.Duration,
backoffFactor: config.SignFailureBackoffFactor,
serialSuffixes: serialSuffixes,
queryBody: queryBody.String(),
}
return &updater, nil
}
func getQuestionsForShardList(count int) string {
return strings.TrimRight(strings.Repeat("?,", count), ",")
}
// findStaleOCSPResponses sends a goroutine to fetch rows of stale OCSP
// responses from the database and returns results on a channel.
func (updater *OCSPUpdater) findStaleOCSPResponses(ctx context.Context, oldestLastUpdatedTime time.Time, batchSize int) <-chan sa.CertStatusMetadata {
// staleStatusesOut channel contains all stale ocsp responses that need
// updating.
staleStatusesOut := make(chan sa.CertStatusMetadata)
args := make([]interface{}, 0)
args = append(args, oldestLastUpdatedTime)
// If serialSuffixes is unset, this will be deliberately a no-op.
for _, c := range updater.serialSuffixes {
args = append(args, c)
}
args = append(args, batchSize)
go func() {
defer close(staleStatusesOut)
rows, err := updater.readOnlyDb.Query(
fmt.Sprintf(
"SELECT %s FROM certificateStatus %s",
strings.Join(sa.CertStatusMetadataFields(), ","),
updater.queryBody,
),
args...,
)
// If error, log and increment retries for backoff. Else no
// error, proceed to push statuses to channel.
if err != nil {
updater.log.AuditErrf("Failed to find stale OCSP responses: %s", err)
updater.findStaleOCSPCounter.WithLabelValues("failed").Inc()
updater.readFailures.Add(1)
return
}
for rows.Next() {
var status sa.CertStatusMetadata
err := sa.ScanCertStatusMetadataRow(rows, &status)
if err != nil {
rows.Close()
updater.log.AuditErrf("Failed to find stale OCSP responses: %s", err)
updater.findStaleOCSPCounter.WithLabelValues("failed").Inc()
updater.readFailures.Add(1)
return
}
staleness := oldestLastUpdatedTime.Sub(status.OCSPLastUpdated).Seconds()
updater.stalenessHistogram.Observe(staleness)
select {
case <-ctx.Done():
return
case staleStatusesOut <- status:
}
}
// Ensure the query wasn't interrupted before it could complete.
err = rows.Close()
if err != nil {
updater.log.AuditErrf("Failed to find stale OCSP responses: %s", err)
updater.findStaleOCSPCounter.WithLabelValues("failed").Inc()
updater.readFailures.Add(1)
return
}
updater.findStaleOCSPCounter.WithLabelValues("success").Inc()
updater.readFailures.Reset()
}()
return staleStatusesOut
}
// generateResponse signs an new OCSP response for a given certStatus row.
// Takes its argument by value to force a copy, then returns a reference to that copy.
func (updater *OCSPUpdater) generateResponse(ctx context.Context, status sa.CertStatusMetadata) (*sa.CertStatusMetadata, error) {
if status.IssuerID == 0 {
return nil, errors.New("cert status has 0 IssuerID")
}
ocspReq := capb.GenerateOCSPRequest{
Serial: status.Serial,
IssuerID: status.IssuerID,
Status: string(status.Status),
Reason: int32(status.RevokedReason),
RevokedAt: status.RevokedDate.UnixNano(),
}
ocspResponse, err := updater.ogc.GenerateOCSP(ctx, &ocspReq)
if err != nil {
return nil, err
}
status.OCSPLastUpdated = updater.clk.Now()
status.OCSPResponse = ocspResponse.Response
return &status, nil
}
// storeResponse stores a given CertificateStatus in the database.
func (updater *OCSPUpdater) storeResponse(status *sa.CertStatusMetadata) error {
// Update the certificateStatus table with the new OCSP response, the status
// WHERE is used make sure we don't overwrite a revoked response with a one
// containing a 'good' status.
_, err := updater.db.Exec(
`UPDATE certificateStatus
SET ocspResponse=?,ocspLastUpdated=?
WHERE id=?
AND status=?`,
status.OCSPResponse,
status.OCSPLastUpdated,
status.ID,
string(status.Status),
)
return err
}
// markExpired updates a given CertificateStatus to have `isExpired` set.
func (updater *OCSPUpdater) markExpired(status sa.CertStatusMetadata) error {
_, err := updater.db.Exec(
`UPDATE certificateStatus
SET isExpired = TRUE
WHERE id = ?`,
status.ID,
)
return err
}
// processExpired is a pipeline step to process a channel of
// `core.CertificateStatus` and set `isExpired` in the database.
func (updater *OCSPUpdater) processExpired(ctx context.Context, staleStatusesIn <-chan sa.CertStatusMetadata) <-chan sa.CertStatusMetadata {
tickStart := updater.clk.Now()
staleStatusesOut := make(chan sa.CertStatusMetadata)
go func() {
defer close(staleStatusesOut)
for status := range staleStatusesIn {
if !status.IsExpired && tickStart.After(status.NotAfter) {
err := updater.markExpired(status)
if err != nil {
// Update error counters and log
updater.log.AuditErrf("Failed to set certificate expired: %s", err)
updater.markExpiredCounter.WithLabelValues("failed").Inc()
} else {
updater.markExpiredCounter.WithLabelValues("success").Inc()
}
}
select {
case <-ctx.Done():
return
case staleStatusesOut <- status:
}
}
}()
return staleStatusesOut
}
// generateOCSPResponses is the final stage of a pipeline. It takes a
// channel of `core.CertificateStatus` and sends a goroutine for each to
// obtain a new OCSP response and update the status in the database.
func (updater *OCSPUpdater) generateOCSPResponses(ctx context.Context, staleStatusesIn <-chan sa.CertStatusMetadata) {
// Use the semaphore pattern from
// https://github.com/golang/go/wiki/BoundingResourceUse to send a number of
// GenerateOCSP / storeResponse requests in parallel, while limiting the total number of
// outstanding requests. The number of outstanding requests equals the
// capacity of the channel.
sem := make(chan int, updater.parallelGenerateOCSPRequests)
wait := func() {
sem <- 1 // Block until there's capacity.
}
done := func(start time.Time) {
<-sem // Indicate there's more capacity.
updater.genStoreHistogram.Observe(time.Since(start).Seconds())
}
// Work runs as a goroutine per ocsp response to obtain a new ocsp
// response and store it in the database.
work := func(status sa.CertStatusMetadata) {
defer done(updater.clk.Now())
meta, err := updater.generateResponse(ctx, status)
if err != nil {
updater.log.AuditErrf("Failed to generate OCSP response: %s", err)
updater.generatedCounter.WithLabelValues("failed").Inc()
return
}
updater.generatedCounter.WithLabelValues("success").Inc()
err = updater.storeResponse(meta)
if err != nil {
updater.log.AuditErrf("Failed to store OCSP response: %s", err)
updater.storedCounter.WithLabelValues("failed").Inc()
return
}
updater.storedCounter.WithLabelValues("success").Inc()
}
// Consume the stale statuses channel and send off a sign/store request
// for each stale response.
for status := range staleStatusesIn {
wait()
go work(status)
}
// Block until the sem channel reaches its full capacity again,
// indicating each goroutine has completed.
for i := 0; i < updater.parallelGenerateOCSPRequests; i++ {
wait()
}
}
type config struct {
OCSPUpdater OCSPUpdaterConfig
OCSPUpdater ocsp_updater_config.Config
Syslog cmd.SyslogConfig
Beeline cmd.BeelineConfig
}
// OCSPUpdaterConfig provides the various window tick times and batch sizes needed
// for the OCSP (and SCT) updater
type OCSPUpdaterConfig struct {
cmd.ServiceConfig
DB cmd.DBConfig
ReadOnlyDB cmd.DBConfig
OldOCSPWindow cmd.ConfigDuration
OldOCSPBatchSize int
OCSPMinTimeToExpiry cmd.ConfigDuration
ParallelGenerateOCSPRequests int
SignFailureBackoffFactor float64
SignFailureBackoffMax cmd.ConfigDuration
SerialSuffixShards string
OCSPGeneratorService *cmd.GRPCClientConfig
Features map[string]bool
}
func (updater *OCSPUpdater) tick() {
start := updater.clk.Now()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oldestLastUpdatedTime := updater.clk.Now().Add(-updater.ocspMinTimeToExpiry)
// Run pipeline
updater.generateOCSPResponses(ctx, updater.processExpired(ctx, updater.findStaleOCSPResponses(ctx, oldestLastUpdatedTime, updater.batchSize)))
end := updater.clk.Now()
took := end.Sub(start)
long, state := "false", "success"
if took > updater.tickWindow {
long = "true"
}
// Set sleep duration to the configured tickWindow.
sleepDur := start.Add(updater.tickWindow).Sub(end)
// Set sleep duration higher to backoff starting the next tick and
// reading from the database if the last read failed.
readFails := updater.readFailures.Value()
if readFails > 0 {
sleepDur = core.RetryBackoff(
readFails,
updater.tickWindow,
updater.maxBackoff,
updater.backoffFactor,
)
}
updater.tickHistogram.WithLabelValues(state, long).Observe(took.Seconds())
updater.clk.Sleep(sleepDur)
}
func configureDb(dbConfig cmd.DBConfig) (*sql.DB, error) {
dsn, err := dbConfig.URL()
if err != nil {
@ -578,7 +124,7 @@ func main() {
serialSuffixes = strings.Fields(c.OCSPUpdater.SerialSuffixShards)
}
updater, err := newUpdater(
updater, err := ocsp_updater.New(
stats,
clk,
db,
@ -593,7 +139,7 @@ func main() {
go cmd.CatchSignals(logger, nil)
for {
updater.tick()
updater.Tick()
}
}

View File

@ -0,0 +1,26 @@
package ocsp_updater_config
import "github.com/letsencrypt/boulder/cmd"
// Config provides the various window tick times and batch sizes needed
// for the OCSP updater
type Config struct {
cmd.ServiceConfig
DB cmd.DBConfig
ReadOnlyDB cmd.DBConfig
OldOCSPWindow cmd.ConfigDuration
OldOCSPBatchSize int
OCSPMinTimeToExpiry cmd.ConfigDuration
ParallelGenerateOCSPRequests int
SignFailureBackoffFactor float64
SignFailureBackoffMax cmd.ConfigDuration
SerialSuffixShards string
OCSPGeneratorService *cmd.GRPCClientConfig
Features map[string]bool
}

445
ocsp_updater/updater.go Normal file
View File

@ -0,0 +1,445 @@
package ocsp_updater
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/jmhodges/clock"
"github.com/prometheus/client_golang/prometheus"
capb "github.com/letsencrypt/boulder/ca/proto"
"github.com/letsencrypt/boulder/core"
blog "github.com/letsencrypt/boulder/log"
ocsp_updater_config "github.com/letsencrypt/boulder/ocsp_updater/config"
"github.com/letsencrypt/boulder/sa"
)
// ocspDB and ocspReadOnlyDB are interfaces collecting the `sql.DB` methods that
// the various parts of OCSPUpdater rely on. Using this adapter shim allows tests to
// swap out the `sql.DB` implementation.
// ocspReadOnlyDb provides only read-only portions of the `sql.DB` interface.
type ocspReadOnlyDb interface {
Query(query string, args ...interface{}) (*sql.Rows, error)
}
// ocspDb provides read-write portions of the `sql.DB` interface.
type ocspDb interface {
ocspReadOnlyDb
Exec(query string, args ...interface{}) (sql.Result, error)
}
// failCounter provides a concurrent safe counter.
type failCounter struct {
mu sync.Mutex
count int
}
func (c *failCounter) Add(i int) {
c.mu.Lock()
defer c.mu.Unlock()
c.count += i
}
func (c *failCounter) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
c.count = 0
}
func (c *failCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
// OCSPUpdater contains the useful objects for the Updater
type OCSPUpdater struct {
log blog.Logger
clk clock.Clock
db ocspDb
readOnlyDb ocspReadOnlyDb
ogc capb.OCSPGeneratorClient
tickWindow time.Duration
batchSize int
tickHistogram *prometheus.HistogramVec
maxBackoff time.Duration
backoffFactor float64
readFailures failCounter
serialSuffixes []string
queryBody string
// Used to calculate how far back stale OCSP responses should be looked for
ocspMinTimeToExpiry time.Duration
// Maximum number of individual OCSP updates to attempt in parallel. Making
// these requests in parallel allows us to get higher total throughput.
parallelGenerateOCSPRequests int
stalenessHistogram prometheus.Histogram
genStoreHistogram prometheus.Histogram
generatedCounter *prometheus.CounterVec
storedCounter *prometheus.CounterVec
markExpiredCounter *prometheus.CounterVec
findStaleOCSPCounter *prometheus.CounterVec
}
func New(
stats prometheus.Registerer,
clk clock.Clock,
db ocspDb,
readOnlyDb ocspReadOnlyDb,
serialSuffixes []string,
ogc capb.OCSPGeneratorClient,
config ocsp_updater_config.Config,
log blog.Logger,
) (*OCSPUpdater, error) {
if config.OldOCSPBatchSize == 0 {
return nil, fmt.Errorf("Loop batch sizes must be non-zero")
}
if config.OldOCSPWindow.Duration == 0 {
return nil, fmt.Errorf("Loop window sizes must be non-zero")
}
if config.ParallelGenerateOCSPRequests == 0 {
// Default to 1
config.ParallelGenerateOCSPRequests = 1
}
for _, s := range serialSuffixes {
if len(s) != 1 || strings.ToLower(s) != s {
return nil, fmt.Errorf("serial suffixes must all be one lowercase character, got %q, expected %q", s, strings.ToLower(s))
}
c := s[0]
if !(c >= '0' && c <= '9' || c >= 'a' && c <= 'f') {
return nil, errors.New("valid range for suffixes is [0-9a-f]")
}
}
var queryBody strings.Builder
queryBody.WriteString("WHERE ocspLastUpdated < ? AND NOT isExpired ")
if len(serialSuffixes) > 0 {
fmt.Fprintf(&queryBody, "AND RIGHT(serial, 1) IN ( %s ) ",
getQuestionsForShardList(len(serialSuffixes)),
)
}
queryBody.WriteString("ORDER BY ocspLastUpdated ASC LIMIT ?")
genStoreHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "ocsp_updater_generate_and_store",
Help: "A histogram of latencies of OCSP generation and storage latencies",
})
stats.MustRegister(genStoreHistogram)
generatedCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ocsp_updater_generated",
Help: "A counter of OCSP response generation calls labeled by result",
}, []string{"result"})
stats.MustRegister(generatedCounter)
storedCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ocsp_updater_stored",
Help: "A counter of OCSP response storage calls labeled by result",
}, []string{"result"})
stats.MustRegister(storedCounter)
tickHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "ocsp_updater_ticks",
Help: "A histogram of ocsp-updater tick latencies labelled by result and whether the tick was considered longer than expected",
Buckets: []float64{0.01, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000},
}, []string{"result", "long"})
stats.MustRegister(tickHistogram)
stalenessHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "ocsp_status_staleness",
Help: "How long past the refresh time a status is when we try to refresh it. Will always be > 0, but must stay well below 12 hours.",
Buckets: []float64{10, 100, 1000, 10000, 21600, 32400, 36000, 39600, 43200, 54000, 64800, 75600, 86400, 108000, 129600, 172800},
})
stats.MustRegister(stalenessHistogram)
markExpiredCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "mark_expired",
Help: "A counter of mark expired calls labeled by result",
}, []string{"result"})
stats.MustRegister(markExpiredCounter)
findStaleOCSPCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "find_stale_ocsp",
Help: "A counter of query for stale OCSP responses labeled by result",
}, []string{"result"})
stats.MustRegister(findStaleOCSPCounter)
updater := OCSPUpdater{
clk: clk,
db: db,
readOnlyDb: readOnlyDb,
ogc: ogc,
log: log,
ocspMinTimeToExpiry: config.OCSPMinTimeToExpiry.Duration,
parallelGenerateOCSPRequests: config.ParallelGenerateOCSPRequests,
genStoreHistogram: genStoreHistogram,
generatedCounter: generatedCounter,
storedCounter: storedCounter,
markExpiredCounter: markExpiredCounter,
findStaleOCSPCounter: findStaleOCSPCounter,
stalenessHistogram: stalenessHistogram,
tickHistogram: tickHistogram,
tickWindow: config.OldOCSPWindow.Duration,
batchSize: config.OldOCSPBatchSize,
maxBackoff: config.SignFailureBackoffMax.Duration,
backoffFactor: config.SignFailureBackoffFactor,
serialSuffixes: serialSuffixes,
queryBody: queryBody.String(),
}
return &updater, nil
}
func getQuestionsForShardList(count int) string {
return strings.TrimRight(strings.Repeat("?,", count), ",")
}
// findStaleOCSPResponses sends a goroutine to fetch rows of stale OCSP
// responses from the database and returns results on a channel.
func (updater *OCSPUpdater) findStaleOCSPResponses(ctx context.Context, oldestLastUpdatedTime time.Time, batchSize int) <-chan sa.CertStatusMetadata {
// staleStatusesOut channel contains all stale ocsp responses that need
// updating.
staleStatusesOut := make(chan sa.CertStatusMetadata)
args := make([]interface{}, 0)
args = append(args, oldestLastUpdatedTime)
// If serialSuffixes is unset, this will be deliberately a no-op.
for _, c := range updater.serialSuffixes {
args = append(args, c)
}
args = append(args, batchSize)
go func() {
defer close(staleStatusesOut)
rows, err := updater.readOnlyDb.Query(
fmt.Sprintf(
"SELECT %s FROM certificateStatus %s",
strings.Join(sa.CertStatusMetadataFields(), ","),
updater.queryBody,
),
args...,
)
// If error, log and increment retries for backoff. Else no
// error, proceed to push statuses to channel.
if err != nil {
updater.log.AuditErrf("Failed to find stale OCSP responses: %s", err)
updater.findStaleOCSPCounter.WithLabelValues("failed").Inc()
updater.readFailures.Add(1)
return
}
for rows.Next() {
var status sa.CertStatusMetadata
err := sa.ScanCertStatusMetadataRow(rows, &status)
if err != nil {
rows.Close()
updater.log.AuditErrf("Failed to find stale OCSP responses: %s", err)
updater.findStaleOCSPCounter.WithLabelValues("failed").Inc()
updater.readFailures.Add(1)
return
}
staleness := oldestLastUpdatedTime.Sub(status.OCSPLastUpdated).Seconds()
updater.stalenessHistogram.Observe(staleness)
select {
case <-ctx.Done():
return
case staleStatusesOut <- status:
}
}
// Ensure the query wasn't interrupted before it could complete.
err = rows.Close()
if err != nil {
updater.log.AuditErrf("Failed to find stale OCSP responses: %s", err)
updater.findStaleOCSPCounter.WithLabelValues("failed").Inc()
updater.readFailures.Add(1)
return
}
updater.findStaleOCSPCounter.WithLabelValues("success").Inc()
updater.readFailures.Reset()
}()
return staleStatusesOut
}
// generateResponse signs an new OCSP response for a given certStatus row.
// Takes its argument by value to force a copy, then returns a reference to that copy.
func (updater *OCSPUpdater) generateResponse(ctx context.Context, status sa.CertStatusMetadata) (*sa.CertStatusMetadata, error) {
if status.IssuerID == 0 {
return nil, errors.New("cert status has 0 IssuerID")
}
ocspReq := capb.GenerateOCSPRequest{
Serial: status.Serial,
IssuerID: status.IssuerID,
Status: string(status.Status),
Reason: int32(status.RevokedReason),
RevokedAt: status.RevokedDate.UnixNano(),
}
ocspResponse, err := updater.ogc.GenerateOCSP(ctx, &ocspReq)
if err != nil {
return nil, err
}
status.OCSPLastUpdated = updater.clk.Now()
status.OCSPResponse = ocspResponse.Response
return &status, nil
}
// storeResponse stores a given CertificateStatus in the database.
func (updater *OCSPUpdater) storeResponse(status *sa.CertStatusMetadata) error {
// Update the certificateStatus table with the new OCSP response, the status
// WHERE is used make sure we don't overwrite a revoked response with a one
// containing a 'good' status.
_, err := updater.db.Exec(
`UPDATE certificateStatus
SET ocspResponse=?,ocspLastUpdated=?
WHERE id=?
AND status=?`,
status.OCSPResponse,
status.OCSPLastUpdated,
status.ID,
string(status.Status),
)
return err
}
// markExpired updates a given CertificateStatus to have `isExpired` set.
func (updater *OCSPUpdater) markExpired(status sa.CertStatusMetadata) error {
_, err := updater.db.Exec(
`UPDATE certificateStatus
SET isExpired = TRUE
WHERE id = ?`,
status.ID,
)
return err
}
// processExpired is a pipeline step to process a channel of
// `core.CertificateStatus` and set `isExpired` in the database.
func (updater *OCSPUpdater) processExpired(ctx context.Context, staleStatusesIn <-chan sa.CertStatusMetadata) <-chan sa.CertStatusMetadata {
tickStart := updater.clk.Now()
staleStatusesOut := make(chan sa.CertStatusMetadata)
go func() {
defer close(staleStatusesOut)
for status := range staleStatusesIn {
if !status.IsExpired && tickStart.After(status.NotAfter) {
err := updater.markExpired(status)
if err != nil {
// Update error counters and log
updater.log.AuditErrf("Failed to set certificate expired: %s", err)
updater.markExpiredCounter.WithLabelValues("failed").Inc()
} else {
updater.markExpiredCounter.WithLabelValues("success").Inc()
}
}
select {
case <-ctx.Done():
return
case staleStatusesOut <- status:
}
}
}()
return staleStatusesOut
}
// generateOCSPResponses is the final stage of a pipeline. It takes a
// channel of `core.CertificateStatus` and sends a goroutine for each to
// obtain a new OCSP response and update the status in the database.
func (updater *OCSPUpdater) generateOCSPResponses(ctx context.Context, staleStatusesIn <-chan sa.CertStatusMetadata) {
// Use the semaphore pattern from
// https://github.com/golang/go/wiki/BoundingResourceUse to send a number of
// GenerateOCSP / storeResponse requests in parallel, while limiting the total number of
// outstanding requests. The number of outstanding requests equals the
// capacity of the channel.
sem := make(chan int, updater.parallelGenerateOCSPRequests)
wait := func() {
sem <- 1 // Block until there's capacity.
}
done := func(start time.Time) {
<-sem // Indicate there's more capacity.
updater.genStoreHistogram.Observe(time.Since(start).Seconds())
}
// Work runs as a goroutine per ocsp response to obtain a new ocsp
// response and store it in the database.
work := func(status sa.CertStatusMetadata) {
defer done(updater.clk.Now())
meta, err := updater.generateResponse(ctx, status)
if err != nil {
updater.log.AuditErrf("Failed to generate OCSP response: %s", err)
updater.generatedCounter.WithLabelValues("failed").Inc()
return
}
updater.generatedCounter.WithLabelValues("success").Inc()
err = updater.storeResponse(meta)
if err != nil {
updater.log.AuditErrf("Failed to store OCSP response: %s", err)
updater.storedCounter.WithLabelValues("failed").Inc()
return
}
updater.storedCounter.WithLabelValues("success").Inc()
}
// Consume the stale statuses channel and send off a sign/store request
// for each stale response.
for status := range staleStatusesIn {
wait()
go work(status)
}
// Block until the sem channel reaches its full capacity again,
// indicating each goroutine has completed.
for i := 0; i < updater.parallelGenerateOCSPRequests; i++ {
wait()
}
}
func (updater *OCSPUpdater) Tick() {
start := updater.clk.Now()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oldestLastUpdatedTime := updater.clk.Now().Add(-updater.ocspMinTimeToExpiry)
// Run pipeline
updater.generateOCSPResponses(ctx, updater.processExpired(ctx, updater.findStaleOCSPResponses(ctx, oldestLastUpdatedTime, updater.batchSize)))
end := updater.clk.Now()
took := end.Sub(start)
long, state := "false", "success"
if took > updater.tickWindow {
long = "true"
}
// Set sleep duration to the configured tickWindow.
sleepDur := start.Add(updater.tickWindow).Sub(end)
// Set sleep duration higher to backoff starting the next tick and
// reading from the database if the last read failed.
readFails := updater.readFailures.Value()
if readFails > 0 {
sleepDur = core.RetryBackoff(
readFails,
updater.tickWindow,
updater.maxBackoff,
updater.backoffFactor,
)
}
updater.tickHistogram.WithLabelValues(state, long).Observe(took.Seconds())
updater.clk.Sleep(sleepDur)
}

View File

@ -1,4 +1,4 @@
package notmain
package ocsp_updater
import (
"context"
@ -21,6 +21,7 @@ import (
bgrpc "github.com/letsencrypt/boulder/grpc"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/metrics"
ocsp_updater_config "github.com/letsencrypt/boulder/ocsp_updater/config"
"github.com/letsencrypt/boulder/sa"
sapb "github.com/letsencrypt/boulder/sa/proto"
"github.com/letsencrypt/boulder/sa/satest"
@ -47,6 +48,9 @@ var log = blog.UseMock()
func setup(t *testing.T) (*OCSPUpdater, sapb.StorageAuthorityClient, *db.WrappedMap, clock.FakeClock, func()) {
dbMap, err := sa.NewDbMap(vars.DBConnSA, sa.DbSettings{})
test.AssertNotError(t, err, "Failed to create dbMap")
readOnlyDb, err := sa.NewDbMap(vars.DBConnSAOcspUpdateRO, sa.DbSettings{})
test.AssertNotError(t, err, "Failed to create dbMap")
cleanUp := test.ResetSATestDatabase(t)
sa.SetSQLDebug(dbMap, log)
fc := clock.NewFake()
@ -55,30 +59,14 @@ func setup(t *testing.T) (*OCSPUpdater, sapb.StorageAuthorityClient, *db.Wrapped
sa, err := sa.NewSQLStorageAuthority(dbMap, dbMap, fc, log, metrics.NoopRegisterer, 1)
test.AssertNotError(t, err, "Failed to create SA")
cleanUp := test.ResetSATestDatabase(t)
db, err := configureDb(
cmd.DBConfig{
DBConnect: vars.DBConnSAOcspUpdate,
},
)
test.AssertNotError(t, err, "Failed to create database client")
readOnlyDb, err := configureDb(
cmd.DBConfig{
DBConnect: vars.DBConnSAOcspUpdateRO,
},
)
test.AssertNotError(t, err, "Failed to create read-only database client")
updater, err := newUpdater(
updater, err := New(
metrics.NoopRegisterer,
fc,
db,
dbMap,
readOnlyDb,
strings.Fields("0 1 2 3 4 5 6 7 8 9 a b c d e f"),
&mockOCSP{},
OCSPUpdaterConfig{
ocsp_updater_config.Config{
OldOCSPBatchSize: 1,
OldOCSPWindow: cmd.ConfigDuration{Duration: time.Second},
SignFailureBackoffFactor: 1.5,
@ -102,7 +90,7 @@ func TestStalenessHistogram(t *testing.T) {
defer cleanUp()
reg := satest.CreateWorkingRegistration(t, sac)
parsedCertA, err := core.LoadCert("test-cert.pem")
parsedCertA, err := core.LoadCert("testdata/test-cert.pem")
test.AssertNotError(t, err, "Couldn't read test certificate")
_, err = sac.AddPrecertificate(ctx, &sapb.AddCertificateRequest{
Der: parsedCertA.Raw,
@ -112,7 +100,7 @@ func TestStalenessHistogram(t *testing.T) {
IssuerID: 1,
})
test.AssertNotError(t, err, "Couldn't add test-cert.pem")
parsedCertB, err := core.LoadCert("test-cert-b.pem")
parsedCertB, err := core.LoadCert("testdata/test-cert-b.pem")
test.AssertNotError(t, err, "Couldn't read test certificate")
_, err = sac.AddPrecertificate(ctx, &sapb.AddCertificateRequest{
Der: parsedCertB.Raw,
@ -145,7 +133,7 @@ func TestGenerateAndStoreOCSPResponse(t *testing.T) {
defer cleanUp()
reg := satest.CreateWorkingRegistration(t, sa)
parsedCert, err := core.LoadCert("test-cert.pem")
parsedCert, err := core.LoadCert("testdata/test-cert.pem")
test.AssertNotError(t, err, "Couldn't read test certificate")
_, err = sa.AddPrecertificate(ctx, &sapb.AddCertificateRequest{
Der: parsedCert.Raw,
@ -193,7 +181,7 @@ func TestGenerateOCSPResponses(t *testing.T) {
defer cleanUp()
reg := satest.CreateWorkingRegistration(t, sa)
parsedCertA, err := core.LoadCert("test-cert.pem")
parsedCertA, err := core.LoadCert("testdata/test-cert.pem")
test.AssertNotError(t, err, "Couldn't read test certificate")
_, err = sa.AddPrecertificate(ctx, &sapb.AddCertificateRequest{
Der: parsedCertA.Raw,
@ -203,7 +191,7 @@ func TestGenerateOCSPResponses(t *testing.T) {
IssuerID: 1,
})
test.AssertNotError(t, err, "Couldn't add test-cert.pem")
parsedCertB, err := core.LoadCert("test-cert-b.pem")
parsedCertB, err := core.LoadCert("testdata/test-cert-b.pem")
test.AssertNotError(t, err, "Couldn't read test certificate")
_, err = sa.AddPrecertificate(ctx, &sapb.AddCertificateRequest{
Der: parsedCertB.Raw,
@ -256,7 +244,7 @@ func TestFindStaleOCSPResponses(t *testing.T) {
test.AssertEquals(t, len(statuses), 0)
reg := satest.CreateWorkingRegistration(t, sa)
parsedCert, err := core.LoadCert("test-cert.pem")
parsedCert, err := core.LoadCert("testdata/test-cert.pem")
test.AssertNotError(t, err, "Couldn't read test certificate")
_, err = sa.AddPrecertificate(ctx, &sapb.AddCertificateRequest{
Der: parsedCert.Raw,
@ -296,7 +284,7 @@ func TestFindStaleOCSPResponsesRevokedReason(t *testing.T) {
defer cleanUp()
reg := satest.CreateWorkingRegistration(t, sa)
parsedCert, err := core.LoadCert("test-cert.pem")
parsedCert, err := core.LoadCert("testdata/test-cert.pem")
test.AssertNotError(t, err, "Couldn't read test certificate")
_, err = sa.AddPrecertificate(ctx, &sapb.AddCertificateRequest{
Der: parsedCert.Raw,
@ -330,7 +318,7 @@ func TestPipelineTick(t *testing.T) {
defer cleanUp()
reg := satest.CreateWorkingRegistration(t, sa)
parsedCert, err := core.LoadCert("test-cert.pem")
parsedCert, err := core.LoadCert("testdata/test-cert.pem")
test.AssertNotError(t, err, "Couldn't read test certificate")
_, err = sa.AddPrecertificate(ctx, &sapb.AddCertificateRequest{
Der: parsedCert.Raw,
@ -361,7 +349,7 @@ func TestProcessExpired(t *testing.T) {
defer cleanUp()
reg := satest.CreateWorkingRegistration(t, sa)
parsedCert, err := core.LoadCert("test-cert.pem")
parsedCert, err := core.LoadCert("testdata/test-cert.pem")
test.AssertNotError(t, err, "Couldn't read test certificate")
serial := core.SerialToString(parsedCert.SerialNumber)
@ -418,7 +406,7 @@ func TestStoreResponseGuard(t *testing.T) {
defer cleanUp()
reg := satest.CreateWorkingRegistration(t, sa)
parsedCert, err := core.LoadCert("test-cert.pem")
parsedCert, err := core.LoadCert("testdata/test-cert.pem")
test.AssertNotError(t, err, "Couldn't read test certificate")
_, err = sa.AddPrecertificate(ctx, &sapb.AddCertificateRequest{
Der: parsedCert.Raw,
@ -580,7 +568,7 @@ func TestTickSleep(t *testing.T) {
// updater.tickWindow
updater.readFailures.Add(2)
before := fc.Now()
updater.tick()
updater.Tick()
test.AssertEquals(t, updater.readFailures.Value(), 3)
took := fc.Since(before)
test.Assert(t, took > updater.tickWindow, "Clock didn't move forward enough")
@ -589,7 +577,7 @@ func TestTickSleep(t *testing.T) {
// zero and the clock only moves by updater.tickWindow
updater.readOnlyDb = dbMap
before = fc.Now()
updater.tick()
updater.Tick()
test.AssertEquals(t, updater.readFailures.Value(), 0)
took = fc.Since(before)
test.AssertEquals(t, took, updater.tickWindow)
@ -606,7 +594,7 @@ func TestFindOCSPResponsesSleep(t *testing.T) {
// and the clock moved forward by more than updater.tickWindow
updater.readFailures.Add(2)
before := fc.Now()
updater.tick()
updater.Tick()
test.AssertEquals(t, updater.readFailures.Value(), 3)
took := fc.Since(before)
test.Assert(t, took > updater.tickWindow, "Clock didn't move forward enough")
@ -615,7 +603,7 @@ func TestFindOCSPResponsesSleep(t *testing.T) {
// and the clock only moves by updater.tickWindow
updater.readOnlyDb = dbMap
before = fc.Now()
updater.tick()
updater.Tick()
test.AssertEquals(t, updater.readFailures.Value(), 0)
took = fc.Since(before)
test.AssertEquals(t, took, updater.tickWindow)
@ -629,14 +617,14 @@ func mkNewUpdaterWithStrings(t *testing.T, shards []string) (*OCSPUpdater, error
fc := clock.NewFake()
updater, err := newUpdater(
updater, err := New(
metrics.NoopRegisterer,
fc,
dbMap,
dbMap,
shards,
&mockOCSP{},
OCSPUpdaterConfig{
ocsp_updater_config.Config{
OldOCSPBatchSize: 1,
OldOCSPWindow: cmd.ConfigDuration{Duration: time.Second},
SignFailureBackoffFactor: 1.5,