From 5845f5ff00529e2408ea7f8e4a952069d522f0be Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Wed, 24 Jun 2015 13:35:25 -0700 Subject: [PATCH 1/2] Don't stop the OCSP updater if there's one error, and log consistently. - Log loop and aggregate duration timing - Log count --- cmd/ocsp-updater/main.go | 45 ++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/cmd/ocsp-updater/main.go b/cmd/ocsp-updater/main.go index 1ed6a7406..5401ed37c 100644 --- a/cmd/ocsp-updater/main.go +++ b/cmd/ocsp-updater/main.go @@ -26,6 +26,13 @@ import ( const ocspResponseLimit int = 128 +// OCSPUpdater contains the useful objects for the Updater +type OCSPUpdater struct { + stats statsd.Statter + cac rpc.CertificateAuthorityClient + dbMap *gorp.DbMap +} + func setupClients(c cmd.Config) (rpc.CertificateAuthorityClient, chan *amqp.Error) { ch, err := cmd.AmqpChannel(c) cmd.FailOnError(err, "Could not connect to AMQP") @@ -41,7 +48,7 @@ func setupClients(c cmd.Config) (rpc.CertificateAuthorityClient, chan *amqp.Erro return cac, closeChan } -func processResponse(cac rpc.CertificateAuthorityClient, tx *gorp.Transaction, serial string) error { +func (updater *OCSPUpdater) processResponse(tx *gorp.Transaction, serial string) error { certObj, err := tx.Get(core.Certificate{}, serial) if err != nil { return err @@ -72,7 +79,7 @@ func processResponse(cac rpc.CertificateAuthorityClient, tx *gorp.Transaction, s RevokedAt: status.RevokedDate, } - ocspResponse, err := cac.GenerateOCSP(signRequest) + ocspResponse, err := updater.cac.GenerateOCSP(signRequest) if err != nil { return err } @@ -97,11 +104,11 @@ func processResponse(cac rpc.CertificateAuthorityClient, tx *gorp.Transaction, s return nil } -func findStaleResponses(cac rpc.CertificateAuthorityClient, dbMap *gorp.DbMap, oldestLastUpdatedTime time.Time, responseLimit int) error { +func (updater *OCSPUpdater) findStaleResponses(oldestLastUpdatedTime time.Time, responseLimit int) error { log := blog.GetAuditLogger() var certificateStatus []core.CertificateStatus - _, err := dbMap.Select(&certificateStatus, + _, err := updater.dbMap.Select(&certificateStatus, `SELECT cs.* FROM certificateStatus AS cs JOIN certificates AS cert ON cs.serial = cert.serial WHERE cs.ocspLastUpdated < ? AND cert.expires > now() ORDER BY cs.ocspLastUpdated ASC @@ -113,27 +120,33 @@ func findStaleResponses(cac rpc.CertificateAuthorityClient, dbMap *gorp.DbMap, o log.Err(fmt.Sprintf("Error loading certificate status: %s", err)) } else { log.Info(fmt.Sprintf("Processing OCSP Responses...\n")) + outerStart := time.Now() + for i, status := range certificateStatus { - log.Info(fmt.Sprintf("OCSP %d: %s", i, status.Serial)) + log.Debug(fmt.Sprintf("OCSP %s: #%d", status.Serial, i)) + innerStart := time.Now() // Each response gets a transaction. To speed this up, we can batch // transactions. - tx, err := dbMap.Begin() + tx, err := updater.dbMap.Begin() if err != nil { - log.Err(fmt.Sprintf("Error starting transaction, aborting: %s", err)) + log.Err(fmt.Sprintf("OCSP %s: Error starting transaction, aborting: %s", status.Serial, err)) tx.Rollback() return err } - if err := processResponse(cac, tx, status.Serial); err != nil { - log.Err(fmt.Sprintf("Could not process OCSP Response for %s: %s", status.Serial, err)) - tx.Rollback() - return err + if err := updater.processResponse(tx, status.Serial); err != nil { + log.Err(fmt.Sprintf("OCSP %s: Could not process OCSP Response: %s", status.Serial, err)) + continue } - log.Info(fmt.Sprintf("OCSP %d: %s OK", i, status.Serial)) + log.Info(fmt.Sprintf("OCSP %s: OK", status.Serial)) tx.Commit() + + updater.stats.TimingDuration("OCSP.UpdateSingle", time.Since(innerStart), 1.0) + updater.stats.Inc("OCSP.UpdatesProcessed", 1, 1.0) } + updater.stats.TimingDuration("OCSP.UpdateAggregate", time.Since(outerStart), 1.0) } return err @@ -185,6 +198,12 @@ func main() { auditlogger.Info(app.VersionString()) + updater := &OCSPUpdater{ + cac: cac, + dbMap: dbMap, + stats: stats, + } + // Calculate the cut-off timestamp if c.OCSPUpdater.MinTimeToExpiry == "" { panic("Config must specify a MinTimeToExpiry period.") @@ -197,7 +216,7 @@ func main() { count := int(math.Min(float64(ocspResponseLimit), float64(c.OCSPUpdater.ResponseLimit))) - err = findStaleResponses(cac, dbMap, oldestLastUpdatedTime, count) + err = updater.findStaleResponses(oldestLastUpdatedTime, count) if err != nil { auditlogger.WarningErr(err) } From 2afbcd5b44a51db211f40f9b046cf33cde375aec Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Wed, 1 Jul 2015 09:58:40 -0700 Subject: [PATCH 2/2] Rework per roland --- cmd/ocsp-updater/main.go | 88 ++++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 25 deletions(-) diff --git a/cmd/ocsp-updater/main.go b/cmd/ocsp-updater/main.go index 5401ed37c..0541e3cd1 100644 --- a/cmd/ocsp-updater/main.go +++ b/cmd/ocsp-updater/main.go @@ -26,9 +26,15 @@ import ( const ocspResponseLimit int = 128 +// FatalError indicates the updater should stop execution +type FatalError string + +func (e FatalError) Error() string { return string(e) } + // OCSPUpdater contains the useful objects for the Updater type OCSPUpdater struct { stats statsd.Statter + log *blog.AuditLogger cac rpc.CertificateAuthorityClient dbMap *gorp.DbMap } @@ -104,9 +110,50 @@ func (updater *OCSPUpdater) processResponse(tx *gorp.Transaction, serial string) return nil } -func (updater *OCSPUpdater) findStaleResponses(oldestLastUpdatedTime time.Time, responseLimit int) error { - log := blog.GetAuditLogger() +// Produce one OCSP response for the given serial, returning err +// if anything went wrong. This method will open and commit a transaction. +func (updater *OCSPUpdater) updateOneSerial(serial string) error { + innerStart := time.Now() + // Each response gets a transaction. In the future we can increase + // performance by batching transactions. + // The key thing to think through is the cost of rollbacks, and whether + // we should rollback if CA/HSM fails to sign the response or only + // upon a partial DB insert. + tx, err := updater.dbMap.Begin() + if err != nil { + updater.log.Err(fmt.Sprintf("OCSP %s: Error starting transaction, aborting: %s", serial, err)) + updater.stats.Inc("OCSP.UpdatesFailed", 1, 1.0) + tx.Rollback() + // Failure to begin transaction is a fatal error. + return FatalError(err.Error()) + } + if err := updater.processResponse(tx, serial); err != nil { + updater.log.Err(fmt.Sprintf("OCSP %s: Could not process OCSP Response, skipping: %s", serial, err)) + updater.stats.Inc("OCSP.UpdatesFailed", 1, 1.0) + tx.Rollback() + return err + } + + err = tx.Commit() + if err != nil { + updater.log.Err(fmt.Sprintf("OCSP %s: Error committing transaction, skipping: %s", serial, err)) + updater.stats.Inc("OCSP.UpdatesFailed", 1, 1.0) + tx.Rollback() + return err + } + + updater.log.Info(fmt.Sprintf("OCSP %s: OK", serial)) + updater.stats.Inc("OCSP.UpdatesProcessed", 1, 1.0) + updater.stats.TimingDuration("OCSP.UpdateTime", time.Since(innerStart), 1.0) + return nil +} + +// findStaleResponses opens a transaction and processes up to responseLimit +// responses in a single batch. The responseLimit should be relatively small, +// so as to limit the chance of the transaction failing due to concurrent +// updates. +func (updater *OCSPUpdater) findStaleResponses(oldestLastUpdatedTime time.Time, responseLimit int) error { var certificateStatus []core.CertificateStatus _, err := updater.dbMap.Select(&certificateStatus, `SELECT cs.* FROM certificateStatus AS cs JOIN certificates AS cert ON cs.serial = cert.serial @@ -115,38 +162,25 @@ func (updater *OCSPUpdater) findStaleResponses(oldestLastUpdatedTime time.Time, LIMIT ?`, oldestLastUpdatedTime, responseLimit) if err == sql.ErrNoRows { - log.Info("All up to date. No OCSP responses needed.") + updater.log.Info("All up to date. No OCSP responses needed.") } else if err != nil { - log.Err(fmt.Sprintf("Error loading certificate status: %s", err)) + updater.log.Err(fmt.Sprintf("Error loading certificate status: %s", err)) } else { - log.Info(fmt.Sprintf("Processing OCSP Responses...\n")) + updater.log.Info(fmt.Sprintf("Processing OCSP Responses...\n")) outerStart := time.Now() for i, status := range certificateStatus { - log.Debug(fmt.Sprintf("OCSP %s: #%d", status.Serial, i)) - innerStart := time.Now() + updater.log.Debug(fmt.Sprintf("OCSP %s: (%d/%d)", status.Serial, i, responseLimit)) - // Each response gets a transaction. To speed this up, we can batch - // transactions. - tx, err := updater.dbMap.Begin() - if err != nil { - log.Err(fmt.Sprintf("OCSP %s: Error starting transaction, aborting: %s", status.Serial, err)) - tx.Rollback() + err = updater.updateOneSerial(status.Serial) + // Abort if we recieve a fatal error + if _, ok := err.(FatalError); ok { return err } - - if err := updater.processResponse(tx, status.Serial); err != nil { - log.Err(fmt.Sprintf("OCSP %s: Could not process OCSP Response: %s", status.Serial, err)) - continue - } - - log.Info(fmt.Sprintf("OCSP %s: OK", status.Serial)) - tx.Commit() - - updater.stats.TimingDuration("OCSP.UpdateSingle", time.Since(innerStart), 1.0) - updater.stats.Inc("OCSP.UpdatesProcessed", 1, 1.0) } - updater.stats.TimingDuration("OCSP.UpdateAggregate", time.Since(outerStart), 1.0) + + updater.stats.TimingDuration("OCSP.BatchTime", time.Since(outerStart), 1.0) + updater.stats.Inc("OCSP.BatchesProcessed", 1, 1.0) } return err @@ -202,6 +236,7 @@ func main() { cac: cac, dbMap: dbMap, stats: stats, + log: auditlogger, } // Calculate the cut-off timestamp @@ -216,6 +251,9 @@ func main() { count := int(math.Min(float64(ocspResponseLimit), float64(c.OCSPUpdater.ResponseLimit))) + // When we choose to batch responses, it may be best to restrict count here, + // change the transaction to survive the whole findStaleResponses, and to + // loop this method call however many times is appropriate. err = updater.findStaleResponses(oldestLastUpdatedTime, count) if err != nil { auditlogger.WarningErr(err)