Sign OCSP in parallel for better performance. (#2422)
Previously all OCSP signing and storage would be serial, which meant it was hard to exercise the full capacity of our HSM. In this change, we run a limited number of update and store requests in parallel. This change also changes stats generation in generateOCSPResponses so we can tell the difference between stats produced by new OCSP requests vs existing ones, and adds a new stat that records how long the SQL query in findStaleOCSPResponses takes.
This commit is contained in:
		
							parent
							
								
									6ec93157f7
								
							
						
					
					
						commit
						26cf552ff9
					
				| 
						 | 
				
			
			@ -224,9 +224,10 @@ type OCSPUpdaterConfig struct {
 | 
			
		|||
	MissingSCTBatchSize         int
 | 
			
		||||
	RevokedCertificateBatchSize int
 | 
			
		||||
 | 
			
		||||
	OCSPMinTimeToExpiry ConfigDuration
 | 
			
		||||
	OCSPStaleMaxAge     ConfigDuration
 | 
			
		||||
	OldestIssuedSCT     ConfigDuration
 | 
			
		||||
	OCSPMinTimeToExpiry          ConfigDuration
 | 
			
		||||
	OCSPStaleMaxAge              ConfigDuration
 | 
			
		||||
	OldestIssuedSCT              ConfigDuration
 | 
			
		||||
	ParallelGenerateOCSPRequests int
 | 
			
		||||
 | 
			
		||||
	AkamaiBaseURL           string
 | 
			
		||||
	AkamaiClientToken       string
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -59,6 +59,9 @@ type OCSPUpdater struct {
 | 
			
		|||
	ocspStaleMaxAge time.Duration
 | 
			
		||||
	// Used to calculate how far back missing SCT receipts should be looked for
 | 
			
		||||
	oldestIssuedSCT time.Duration
 | 
			
		||||
	// Maximum number of individual OCSP updates to attempt in parallel. Making
 | 
			
		||||
	// these requests in parallel allows us to get higher total throughput.
 | 
			
		||||
	parallelGenerateOCSPRequests int
 | 
			
		||||
	// Logs we expect to have SCT receipts for. Missing logs will be resubmitted to.
 | 
			
		||||
	logs []*ctLog
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -96,6 +99,10 @@ func newUpdater(
 | 
			
		|||
		// Default to 30 days
 | 
			
		||||
		config.OCSPStaleMaxAge = cmd.ConfigDuration{Duration: time.Hour * 24 * 30}
 | 
			
		||||
	}
 | 
			
		||||
	if config.ParallelGenerateOCSPRequests == 0 {
 | 
			
		||||
		// Default to 1
 | 
			
		||||
		config.ParallelGenerateOCSPRequests = 1
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	logs := make([]*ctLog, len(logConfigs))
 | 
			
		||||
	for i, logConfig := range logConfigs {
 | 
			
		||||
| 
						 | 
				
			
			@ -107,17 +114,18 @@ func newUpdater(
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	updater := OCSPUpdater{
 | 
			
		||||
		stats:               stats,
 | 
			
		||||
		clk:                 clk,
 | 
			
		||||
		dbMap:               dbMap,
 | 
			
		||||
		cac:                 ca,
 | 
			
		||||
		log:                 log,
 | 
			
		||||
		sac:                 sac,
 | 
			
		||||
		pubc:                pub,
 | 
			
		||||
		logs:                logs,
 | 
			
		||||
		ocspMinTimeToExpiry: config.OCSPMinTimeToExpiry.Duration,
 | 
			
		||||
		ocspStaleMaxAge:     config.OCSPStaleMaxAge.Duration,
 | 
			
		||||
		oldestIssuedSCT:     config.OldestIssuedSCT.Duration,
 | 
			
		||||
		stats:                        stats,
 | 
			
		||||
		clk:                          clk,
 | 
			
		||||
		dbMap:                        dbMap,
 | 
			
		||||
		cac:                          ca,
 | 
			
		||||
		log:                          log,
 | 
			
		||||
		sac:                          sac,
 | 
			
		||||
		pubc:                         pub,
 | 
			
		||||
		logs:                         logs,
 | 
			
		||||
		ocspMinTimeToExpiry:          config.OCSPMinTimeToExpiry.Duration,
 | 
			
		||||
		ocspStaleMaxAge:              config.OCSPStaleMaxAge.Duration,
 | 
			
		||||
		oldestIssuedSCT:              config.OldestIssuedSCT.Duration,
 | 
			
		||||
		parallelGenerateOCSPRequests: config.ParallelGenerateOCSPRequests,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Setup loops
 | 
			
		||||
| 
						 | 
				
			
			@ -380,7 +388,7 @@ func (updater *OCSPUpdater) newCertificateTick(ctx context.Context, batchSize in
 | 
			
		|||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return updater.generateOCSPResponses(ctx, statuses)
 | 
			
		||||
	return updater.generateOCSPResponses(ctx, statuses, updater.stats.NewScope("newCertificateTick"))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (updater *OCSPUpdater) findRevokedCertificatesToUpdate(batchSize int) ([]core.CertificateStatus, error) {
 | 
			
		||||
| 
						 | 
				
			
			@ -430,22 +438,46 @@ func (updater *OCSPUpdater) revokedCertificatesTick(ctx context.Context, batchSi
 | 
			
		|||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (updater *OCSPUpdater) generateOCSPResponses(ctx context.Context, statuses []core.CertificateStatus) error {
 | 
			
		||||
	for _, status := range statuses {
 | 
			
		||||
func (updater *OCSPUpdater) generateOCSPResponses(ctx context.Context, statuses []core.CertificateStatus, stats metrics.Scope) error {
 | 
			
		||||
	// Use the semaphore pattern from
 | 
			
		||||
	// https://github.com/golang/go/wiki/BoundingResourceUse to send a number of
 | 
			
		||||
	// GenerateOCSP / storeResponse requests in parallel, while limiting the total number of
 | 
			
		||||
	// outstanding requests. The number of outstanding requests equals the
 | 
			
		||||
	// capacity of the channel.
 | 
			
		||||
	sem := make(chan int, updater.parallelGenerateOCSPRequests)
 | 
			
		||||
	wait := func() {
 | 
			
		||||
		sem <- 1 // Block until there's capacity.
 | 
			
		||||
	}
 | 
			
		||||
	done := func() {
 | 
			
		||||
		<-sem // Indicate there's more capacity.
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	work := func(status core.CertificateStatus) {
 | 
			
		||||
		defer done()
 | 
			
		||||
		meta, err := updater.generateResponse(ctx, status)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			updater.log.AuditErr(fmt.Sprintf("Failed to generate OCSP response: %s", err))
 | 
			
		||||
			updater.stats.Inc("Errors.ResponseGeneration", 1)
 | 
			
		||||
			return err
 | 
			
		||||
			stats.Inc("Errors.ResponseGeneration", 1)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		updater.stats.Inc("GeneratedResponses", 1)
 | 
			
		||||
		err = updater.storeResponse(meta)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			updater.log.AuditErr(fmt.Sprintf("Failed to store OCSP response: %s", err))
 | 
			
		||||
			updater.stats.Inc("Errors.StoreResponse", 1)
 | 
			
		||||
			continue
 | 
			
		||||
			stats.Inc("Errors.StoreResponse", 1)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		updater.stats.Inc("StoredResponses", 1)
 | 
			
		||||
		stats.Inc("StoredResponses", 1)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, status := range statuses {
 | 
			
		||||
		wait()
 | 
			
		||||
		go work(status)
 | 
			
		||||
	}
 | 
			
		||||
	// Block until the channel reaches its full capacity again, indicating each
 | 
			
		||||
	// goroutine has completed.
 | 
			
		||||
	for i := 0; i < updater.parallelGenerateOCSPRequests; i++ {
 | 
			
		||||
		wait()
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -453,15 +485,16 @@ func (updater *OCSPUpdater) generateOCSPResponses(ctx context.Context, statuses
 | 
			
		|||
// oldOCSPResponsesTick looks for certificates with stale OCSP responses and
 | 
			
		||||
// generates/stores new ones
 | 
			
		||||
func (updater *OCSPUpdater) oldOCSPResponsesTick(ctx context.Context, batchSize int) error {
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	statuses, err := updater.findStaleOCSPResponses(now.Add(-updater.ocspMinTimeToExpiry), batchSize)
 | 
			
		||||
	tickStart := time.Now()
 | 
			
		||||
	statuses, err := updater.findStaleOCSPResponses(tickStart.Add(-updater.ocspMinTimeToExpiry), batchSize)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		updater.stats.Inc("Errors.FindStaleResponses", 1)
 | 
			
		||||
		updater.log.AuditErr(fmt.Sprintf("Failed to find stale OCSP responses: %s", err))
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	updater.stats.TimingDuration("oldOCSPResponsesTick.QueryTime", time.Since(tickStart))
 | 
			
		||||
 | 
			
		||||
	return updater.generateOCSPResponses(ctx, statuses)
 | 
			
		||||
	return updater.generateOCSPResponses(ctx, statuses, updater.stats.NewScope("oldOCSPResponsesTick"))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (updater *OCSPUpdater) getSerialsIssuedSince(since time.Time, batchSize int) ([]string, error) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -30,7 +30,9 @@ import (
 | 
			
		|||
 | 
			
		||||
var ctx = context.Background()
 | 
			
		||||
 | 
			
		||||
type mockCA struct{}
 | 
			
		||||
type mockCA struct {
 | 
			
		||||
	sleepTime time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ca *mockCA) IssueCertificate(_ context.Context, csr x509.CertificateRequest, regID int64) (core.Certificate, error) {
 | 
			
		||||
	return core.Certificate{}, nil
 | 
			
		||||
| 
						 | 
				
			
			@ -38,6 +40,7 @@ func (ca *mockCA) IssueCertificate(_ context.Context, csr x509.CertificateReques
 | 
			
		|||
 | 
			
		||||
func (ca *mockCA) GenerateOCSP(_ context.Context, xferObj core.OCSPSigningRequest) (ocsp []byte, err error) {
 | 
			
		||||
	ocsp = []byte{1, 2, 3}
 | 
			
		||||
	time.Sleep(ca.sleepTime)
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -220,8 +223,20 @@ func TestGenerateOCSPResponses(t *testing.T) {
 | 
			
		|||
	test.AssertNotError(t, err, "Couldn't find stale responses")
 | 
			
		||||
	test.AssertEquals(t, len(certs), 2)
 | 
			
		||||
 | 
			
		||||
	err = updater.generateOCSPResponses(ctx, certs)
 | 
			
		||||
	// Hacky test of parallelism: Make each request to the CA take 1 second, and
 | 
			
		||||
	// produce 2 requests to the CA. If the pair of requests complete in about a
 | 
			
		||||
	// second, they were made in parallel.
 | 
			
		||||
	// Note that this test also tests the basic functionality of
 | 
			
		||||
	// generateOCSPResponses.
 | 
			
		||||
	start := time.Now()
 | 
			
		||||
	updater.cac = &mockCA{time.Second}
 | 
			
		||||
	updater.parallelGenerateOCSPRequests = 10
 | 
			
		||||
	err = updater.generateOCSPResponses(ctx, certs, metrics.NewNoopScope())
 | 
			
		||||
	test.AssertNotError(t, err, "Couldn't generate OCSP responses")
 | 
			
		||||
	elapsed := time.Since(start)
 | 
			
		||||
	if elapsed > 1500*time.Millisecond {
 | 
			
		||||
		t.Errorf("generateOCSPResponses took too long, expected it to make calls in parallel.")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	certs, err = updater.findStaleOCSPResponses(earliest, 10)
 | 
			
		||||
	test.AssertNotError(t, err, "Failed to find stale responses")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -9,6 +9,7 @@
 | 
			
		|||
    "newCertificateBatchSize": 1000,
 | 
			
		||||
    "oldOCSPBatchSize": 5000,
 | 
			
		||||
    "missingSCTBatchSize": 5000,
 | 
			
		||||
    "parallelGenerateOCSPRequests": 10,
 | 
			
		||||
    "revokedCertificateBatchSize": 1000,
 | 
			
		||||
    "ocspMinTimeToExpiry": "72h",
 | 
			
		||||
    "ocspStaleMaxAge": "720h",
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -8,6 +8,7 @@
 | 
			
		|||
    "revokedCertificateWindow": "1s",
 | 
			
		||||
    "newCertificateBatchSize": 1000,
 | 
			
		||||
    "oldOCSPBatchSize": 5000,
 | 
			
		||||
    "parallelGenerateOCSPRequests": 10,
 | 
			
		||||
    "missingSCTBatchSize": 5000,
 | 
			
		||||
    "revokedCertificateBatchSize": 1000,
 | 
			
		||||
    "ocspMinTimeToExpiry": "72h",
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue