Merge pull request #408 from letsencrypt/ocsp-dont-stop-on-error
Don't stop the OCSP updater if there's one error, and log consistently.
This commit is contained in:
commit
ec4b94e3d9
|
@ -26,6 +26,19 @@ import (
|
|||
|
||||
const ocspResponseLimit int = 128
|
||||
|
||||
// FatalError indicates the updater should stop execution
|
||||
type FatalError string
|
||||
|
||||
func (e FatalError) Error() string { return string(e) }
|
||||
|
||||
// OCSPUpdater contains the useful objects for the Updater
|
||||
type OCSPUpdater struct {
|
||||
stats statsd.Statter
|
||||
log *blog.AuditLogger
|
||||
cac rpc.CertificateAuthorityClient
|
||||
dbMap *gorp.DbMap
|
||||
}
|
||||
|
||||
func setupClients(c cmd.Config) (rpc.CertificateAuthorityClient, chan *amqp.Error) {
|
||||
ch, err := cmd.AmqpChannel(c)
|
||||
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||
|
@ -41,7 +54,7 @@ func setupClients(c cmd.Config) (rpc.CertificateAuthorityClient, chan *amqp.Erro
|
|||
return cac, closeChan
|
||||
}
|
||||
|
||||
func processResponse(cac rpc.CertificateAuthorityClient, tx *gorp.Transaction, serial string) error {
|
||||
func (updater *OCSPUpdater) processResponse(tx *gorp.Transaction, serial string) error {
|
||||
certObj, err := tx.Get(core.Certificate{}, serial)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -72,7 +85,7 @@ func processResponse(cac rpc.CertificateAuthorityClient, tx *gorp.Transaction, s
|
|||
RevokedAt: status.RevokedDate,
|
||||
}
|
||||
|
||||
ocspResponse, err := cac.GenerateOCSP(signRequest)
|
||||
ocspResponse, err := updater.cac.GenerateOCSP(signRequest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -97,43 +110,77 @@ func processResponse(cac rpc.CertificateAuthorityClient, tx *gorp.Transaction, s
|
|||
return nil
|
||||
}
|
||||
|
||||
func findStaleResponses(cac rpc.CertificateAuthorityClient, dbMap *gorp.DbMap, oldestLastUpdatedTime time.Time, responseLimit int) error {
|
||||
log := blog.GetAuditLogger()
|
||||
// Produce one OCSP response for the given serial, returning err
|
||||
// if anything went wrong. This method will open and commit a transaction.
|
||||
func (updater *OCSPUpdater) updateOneSerial(serial string) error {
|
||||
innerStart := time.Now()
|
||||
// Each response gets a transaction. In the future we can increase
|
||||
// performance by batching transactions.
|
||||
// The key thing to think through is the cost of rollbacks, and whether
|
||||
// we should rollback if CA/HSM fails to sign the response or only
|
||||
// upon a partial DB insert.
|
||||
tx, err := updater.dbMap.Begin()
|
||||
if err != nil {
|
||||
updater.log.Err(fmt.Sprintf("OCSP %s: Error starting transaction, aborting: %s", serial, err))
|
||||
updater.stats.Inc("OCSP.UpdatesFailed", 1, 1.0)
|
||||
tx.Rollback()
|
||||
// Failure to begin transaction is a fatal error.
|
||||
return FatalError(err.Error())
|
||||
}
|
||||
|
||||
if err := updater.processResponse(tx, serial); err != nil {
|
||||
updater.log.Err(fmt.Sprintf("OCSP %s: Could not process OCSP Response, skipping: %s", serial, err))
|
||||
updater.stats.Inc("OCSP.UpdatesFailed", 1, 1.0)
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
updater.log.Err(fmt.Sprintf("OCSP %s: Error committing transaction, skipping: %s", serial, err))
|
||||
updater.stats.Inc("OCSP.UpdatesFailed", 1, 1.0)
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
|
||||
updater.log.Info(fmt.Sprintf("OCSP %s: OK", serial))
|
||||
updater.stats.Inc("OCSP.UpdatesProcessed", 1, 1.0)
|
||||
updater.stats.TimingDuration("OCSP.UpdateTime", time.Since(innerStart), 1.0)
|
||||
return nil
|
||||
}
|
||||
|
||||
// findStaleResponses opens a transaction and processes up to responseLimit
|
||||
// responses in a single batch. The responseLimit should be relatively small,
|
||||
// so as to limit the chance of the transaction failing due to concurrent
|
||||
// updates.
|
||||
func (updater *OCSPUpdater) findStaleResponses(oldestLastUpdatedTime time.Time, responseLimit int) error {
|
||||
var certificateStatus []core.CertificateStatus
|
||||
_, err := dbMap.Select(&certificateStatus,
|
||||
_, err := updater.dbMap.Select(&certificateStatus,
|
||||
`SELECT cs.* FROM certificateStatus AS cs JOIN certificates AS cert ON cs.serial = cert.serial
|
||||
WHERE cs.ocspLastUpdated < ? AND cert.expires > now()
|
||||
ORDER BY cs.ocspLastUpdated ASC
|
||||
LIMIT ?`, oldestLastUpdatedTime, responseLimit)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
log.Info("All up to date. No OCSP responses needed.")
|
||||
updater.log.Info("All up to date. No OCSP responses needed.")
|
||||
} else if err != nil {
|
||||
log.Err(fmt.Sprintf("Error loading certificate status: %s", err))
|
||||
updater.log.Err(fmt.Sprintf("Error loading certificate status: %s", err))
|
||||
} else {
|
||||
log.Info(fmt.Sprintf("Processing OCSP Responses...\n"))
|
||||
updater.log.Info(fmt.Sprintf("Processing OCSP Responses...\n"))
|
||||
outerStart := time.Now()
|
||||
|
||||
for i, status := range certificateStatus {
|
||||
log.Info(fmt.Sprintf("OCSP %d: %s", i, status.Serial))
|
||||
updater.log.Debug(fmt.Sprintf("OCSP %s: (%d/%d)", status.Serial, i, responseLimit))
|
||||
|
||||
// Each response gets a transaction. To speed this up, we can batch
|
||||
// transactions.
|
||||
tx, err := dbMap.Begin()
|
||||
if err != nil {
|
||||
log.Err(fmt.Sprintf("Error starting transaction, aborting: %s", err))
|
||||
tx.Rollback()
|
||||
err = updater.updateOneSerial(status.Serial)
|
||||
// Abort if we recieve a fatal error
|
||||
if _, ok := err.(FatalError); ok {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := processResponse(cac, tx, status.Serial); err != nil {
|
||||
log.Err(fmt.Sprintf("Could not process OCSP Response for %s: %s", status.Serial, err))
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info(fmt.Sprintf("OCSP %d: %s OK", i, status.Serial))
|
||||
tx.Commit()
|
||||
}
|
||||
|
||||
updater.stats.TimingDuration("OCSP.BatchTime", time.Since(outerStart), 1.0)
|
||||
updater.stats.Inc("OCSP.BatchesProcessed", 1, 1.0)
|
||||
}
|
||||
|
||||
return err
|
||||
|
@ -185,6 +232,13 @@ func main() {
|
|||
|
||||
auditlogger.Info(app.VersionString())
|
||||
|
||||
updater := &OCSPUpdater{
|
||||
cac: cac,
|
||||
dbMap: dbMap,
|
||||
stats: stats,
|
||||
log: auditlogger,
|
||||
}
|
||||
|
||||
// Calculate the cut-off timestamp
|
||||
if c.OCSPUpdater.MinTimeToExpiry == "" {
|
||||
panic("Config must specify a MinTimeToExpiry period.")
|
||||
|
@ -197,7 +251,10 @@ func main() {
|
|||
|
||||
count := int(math.Min(float64(ocspResponseLimit), float64(c.OCSPUpdater.ResponseLimit)))
|
||||
|
||||
err = findStaleResponses(cac, dbMap, oldestLastUpdatedTime, count)
|
||||
// When we choose to batch responses, it may be best to restrict count here,
|
||||
// change the transaction to survive the whole findStaleResponses, and to
|
||||
// loop this method call however many times is appropriate.
|
||||
err = updater.findStaleResponses(oldestLastUpdatedTime, count)
|
||||
if err != nil {
|
||||
auditlogger.WarningErr(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue