Add logging of OCSP generation events (#5223)
This adds a new component to the CA, ocspLogQueue, which batches up OCSP generation events for audit logging. It will log accumulated events when it reaches a certain line length, or when a maximum amount of times has passed.
This commit is contained in:
parent
f5982c6d44
commit
8b9145838d
22
ca/ca.go
22
ca/ca.go
|
@ -139,6 +139,7 @@ type CertificateAuthorityImpl struct {
|
|||
ocspLifetime time.Duration
|
||||
keyPolicy goodkey.KeyPolicy
|
||||
orphanQueue *goque.Queue
|
||||
ocspLogQueue *ocspLogQueue
|
||||
clk clock.Clock
|
||||
log blog.Logger
|
||||
signatureCount *prometheus.CounterVec
|
||||
|
@ -263,6 +264,8 @@ func NewCertificateAuthorityImpl(
|
|||
ocspLifetime time.Duration,
|
||||
keyPolicy goodkey.KeyPolicy,
|
||||
orphanQueue *goque.Queue,
|
||||
ocspLogMaxLength int,
|
||||
ocspLogPeriod time.Duration,
|
||||
logger blog.Logger,
|
||||
stats prometheus.Registerer,
|
||||
clk clock.Clock,
|
||||
|
@ -357,6 +360,8 @@ func NewCertificateAuthorityImpl(
|
|||
}, []string{"type"})
|
||||
stats.MustRegister(signErrorCounter)
|
||||
|
||||
ocspLogQueue := newOCSPLogQueue(ocspLogMaxLength, ocspLogPeriod, stats, logger)
|
||||
|
||||
ca = &CertificateAuthorityImpl{
|
||||
sa: sa,
|
||||
pa: pa,
|
||||
|
@ -370,6 +375,7 @@ func NewCertificateAuthorityImpl(
|
|||
ocspLifetime: ocspLifetime,
|
||||
keyPolicy: keyPolicy,
|
||||
orphanQueue: orphanQueue,
|
||||
ocspLogQueue: ocspLogQueue,
|
||||
log: logger,
|
||||
signatureCount: signatureCount,
|
||||
csrExtensionCount: csrExtensionCount,
|
||||
|
@ -527,6 +533,8 @@ func (ca *CertificateAuthorityImpl) GenerateOCSP(ctx context.Context, req *capb.
|
|||
tbsResponse.RevocationReason = int(req.Reason)
|
||||
}
|
||||
|
||||
ca.ocspLogQueue.enqueue(serial.Bytes(), now, ocsp.ResponseStatus(tbsResponse.Status))
|
||||
|
||||
ocspResponse, err := ocsp.CreateResponse(issuer.cert.Certificate, issuer.cert.Certificate, tbsResponse, issuer.ocspSigner)
|
||||
ca.noteSignError(err)
|
||||
if err == nil {
|
||||
|
@ -932,6 +940,20 @@ func (ca *CertificateAuthorityImpl) OrphanIntegrationLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
// LogOCSPLoop collects OCSP generation log events into bundles, and logs
|
||||
// them periodically.
|
||||
func (ca *CertificateAuthorityImpl) LogOCSPLoop() {
|
||||
ca.ocspLogQueue.loop()
|
||||
}
|
||||
|
||||
// Stop asks this CertificateAuthorityImpl to shut down. It must be called
|
||||
// after the corresponding RPC service is shut down and there are no longer
|
||||
// any inflight RPCs. It will attempt to drain any logging queues (which may
|
||||
// block), and will return only when done.
|
||||
func (ca *CertificateAuthorityImpl) Stop() {
|
||||
ca.ocspLogQueue.stop()
|
||||
}
|
||||
|
||||
// integrateOrpan removes an orphan from the queue and adds it to the database. The
|
||||
// item isn't dequeued until it is actually added to the database to prevent items from
|
||||
// being lost if the CA is restarted between the item being dequeued and being added to
|
||||
|
|
|
@ -328,6 +328,8 @@ func TestFailNoSerialPrefix(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -438,6 +440,8 @@ func issueCertificateSubTestSetup(t *testing.T, boulderIssuer bool) (*Certificat
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -500,6 +504,8 @@ func TestMultipleIssuers(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -533,6 +539,8 @@ func TestOCSP(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -592,6 +600,8 @@ func TestOCSP(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -697,6 +707,8 @@ func TestInvalidCSRs(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -736,6 +748,8 @@ func TestRejectValidityTooLong(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -795,6 +809,8 @@ func TestSingleAIAEnforcement(t *testing.T) {
|
|||
time.Second,
|
||||
goodkey.KeyPolicy{},
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
&blog.Mock{},
|
||||
metrics.NoopRegisterer,
|
||||
clock.New(),
|
||||
|
@ -911,6 +927,8 @@ func TestIssueCertificateForPrecertificate(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -1001,6 +1019,8 @@ func TestIssueCertificateForPrecertificateDuplicateSerial(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -1045,6 +1065,8 @@ func TestIssueCertificateForPrecertificateDuplicateSerial(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -1126,6 +1148,8 @@ func TestPrecertOrphanQueue(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
orphanQueue,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -1196,6 +1220,8 @@ func TestOrphanQueue(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
orphanQueue,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -1316,6 +1342,8 @@ func TestIssuePrecertificateLinting(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
@ -1380,6 +1408,8 @@ func TestGenerateOCSPWithIssuerID(t *testing.T) {
|
|||
testCtx.ocspLifetime,
|
||||
testCtx.keyPolicy,
|
||||
nil,
|
||||
0,
|
||||
time.Second,
|
||||
testCtx.logger,
|
||||
testCtx.stats,
|
||||
testCtx.fc)
|
||||
|
|
|
@ -0,0 +1,131 @@
|
|||
package ca
|
||||
|
||||
// TODO(##5226): Move the GenerateOCSP service into this file too.
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jmhodges/clock"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"golang.org/x/crypto/ocsp"
|
||||
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
)
|
||||
|
||||
// ocspLogQueue accumulates OCSP logging events and writes several of them
|
||||
// in a single log line. This reduces the number of log lines and bytes,
|
||||
// which would otherwise be quite high. As of Jan 2021 we do approximately
|
||||
// 550 rps of OCSP generation events. We can turn that into about 5.5 rps
|
||||
// of log lines if we accumulate 100 entries per line, which amounts to about
|
||||
// 3900 bytes per log line.
|
||||
// Summary of log line usage:
|
||||
// serial in hex: 36 bytes, separator characters: 2 bytes, status: 1 byte
|
||||
// If maxLogLen is 0, do not perform any accumulation or logging.
|
||||
type ocspLogQueue struct {
|
||||
// Maximum length, in bytes, of a single log line.
|
||||
maxLogLen int
|
||||
// Maximum amount of time between OCSP logging events.
|
||||
period time.Duration
|
||||
queue chan ocspLog
|
||||
// This allows the stop() function to block until we've drained the queue.
|
||||
wg sync.WaitGroup
|
||||
depth prometheus.Gauge
|
||||
logger blog.Logger
|
||||
clk clock.Clock
|
||||
}
|
||||
|
||||
type ocspLog struct {
|
||||
serial []byte
|
||||
time time.Time
|
||||
status ocsp.ResponseStatus
|
||||
}
|
||||
|
||||
func newOCSPLogQueue(
|
||||
maxLogLen int,
|
||||
period time.Duration,
|
||||
stats prometheus.Registerer,
|
||||
logger blog.Logger,
|
||||
) *ocspLogQueue {
|
||||
depth := prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "ocsp_log_queue_depth",
|
||||
Help: "Number of OCSP generation log entries waiting to be written",
|
||||
})
|
||||
stats.MustRegister(depth)
|
||||
olq := ocspLogQueue{
|
||||
maxLogLen: maxLogLen,
|
||||
period: period,
|
||||
queue: make(chan ocspLog, 1000),
|
||||
wg: sync.WaitGroup{},
|
||||
depth: depth,
|
||||
logger: logger,
|
||||
clk: clock.Default(),
|
||||
}
|
||||
olq.wg.Add(1)
|
||||
return &olq
|
||||
}
|
||||
|
||||
func (olq *ocspLogQueue) enqueue(serial []byte, time time.Time, status ocsp.ResponseStatus) {
|
||||
olq.queue <- ocspLog{
|
||||
serial: append([]byte{}, serial...),
|
||||
time: time,
|
||||
status: status,
|
||||
}
|
||||
}
|
||||
|
||||
// To ensure we don't go over the max log line length, use a safety margin
|
||||
// equal to the expected length of an entry.
|
||||
const ocspSingleLogEntryLen = 39
|
||||
|
||||
// loop consumes events from the queue channel, batches them up, and
|
||||
// logs them in batches of maxLogLen / 39, or every `period`,
|
||||
// whichever comes first.
|
||||
func (olq *ocspLogQueue) loop() {
|
||||
defer olq.wg.Done()
|
||||
if olq.maxLogLen == 0 {
|
||||
return
|
||||
}
|
||||
done := false
|
||||
for !done {
|
||||
var builder strings.Builder
|
||||
deadline := olq.clk.After(olq.period)
|
||||
inner:
|
||||
for {
|
||||
olq.depth.Set(float64(len(olq.queue)))
|
||||
select {
|
||||
case ol, ok := <-olq.queue:
|
||||
if !ok {
|
||||
// Channel was closed, finish.
|
||||
done = true
|
||||
break inner
|
||||
}
|
||||
fmt.Fprintf(&builder, "%x:%d,", ol.serial, ol.status)
|
||||
case <-deadline:
|
||||
break inner
|
||||
}
|
||||
if builder.Len()+ocspSingleLogEntryLen > olq.maxLogLen {
|
||||
break
|
||||
}
|
||||
}
|
||||
if builder.Len() > 0 {
|
||||
olq.logger.AuditInfof("OCSP signed: %s", builder.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// stop the loop, and wait for it to finish. This must be called only after
|
||||
// it's guaranteed that nothing will call enqueue again (for instance, after
|
||||
// the OCSPGenerator and CertificateAuthority services are shut down with
|
||||
// no RPCs in flight). Otherwise, enqueue will panic.
|
||||
// If this is called without previously starting a goroutine running `.loop()`,
|
||||
// it will block forever.
|
||||
func (olq *ocspLogQueue) stop() {
|
||||
if olq.maxLogLen == 0 {
|
||||
return
|
||||
}
|
||||
close(olq.queue)
|
||||
olq.wg.Wait()
|
||||
}
|
|
@ -0,0 +1,130 @@
|
|||
package ca
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
"golang.org/x/crypto/ocsp"
|
||||
)
|
||||
|
||||
func serial(t *testing.T) []byte {
|
||||
serial, err := hex.DecodeString("aabbccddeeffaabbccddeeff000102030405")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return serial
|
||||
|
||||
}
|
||||
|
||||
// Set up an ocspLogQueue with a very long period and a large maxLen,
|
||||
// to ensure any buffered entries get flushed on `.stop()`.
|
||||
func TestOcspLogFlushOnExit(t *testing.T) {
|
||||
t.Parallel()
|
||||
log := blog.NewMock()
|
||||
stats := metrics.NoopRegisterer
|
||||
queue := newOCSPLogQueue(4000, 10000*time.Millisecond, stats, log)
|
||||
go queue.loop()
|
||||
queue.enqueue(serial(t), time.Now(), ocsp.ResponseStatus(ocsp.Good))
|
||||
queue.stop()
|
||||
|
||||
expected := []string{
|
||||
"INFO: [AUDIT] OCSP signed: aabbccddeeffaabbccddeeff000102030405:0,",
|
||||
}
|
||||
test.AssertDeepEquals(t, log.GetAll(), expected)
|
||||
}
|
||||
|
||||
// Ensure log lines are sent when they exceed maxLen.
|
||||
func TestOcspFlushOnLength(t *testing.T) {
|
||||
t.Parallel()
|
||||
log := blog.NewMock()
|
||||
stats := metrics.NoopRegisterer
|
||||
queue := newOCSPLogQueue(100, 100*time.Millisecond, stats, log)
|
||||
go queue.loop()
|
||||
for i := 0; i < 5; i++ {
|
||||
queue.enqueue(serial(t), time.Now(), ocsp.ResponseStatus(ocsp.Good))
|
||||
}
|
||||
queue.stop()
|
||||
|
||||
expected := []string{
|
||||
"INFO: [AUDIT] OCSP signed: aabbccddeeffaabbccddeeff000102030405:0,aabbccddeeffaabbccddeeff000102030405:0,",
|
||||
"INFO: [AUDIT] OCSP signed: aabbccddeeffaabbccddeeff000102030405:0,aabbccddeeffaabbccddeeff000102030405:0,",
|
||||
"INFO: [AUDIT] OCSP signed: aabbccddeeffaabbccddeeff000102030405:0,",
|
||||
}
|
||||
test.AssertDeepEquals(t, log.GetAll(), expected)
|
||||
}
|
||||
|
||||
// Ensure log lines are sent after a timeout.
|
||||
func TestOcspFlushOnTimeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
log := blog.NewMock()
|
||||
stats := metrics.NoopRegisterer
|
||||
queue := newOCSPLogQueue(90000, 10*time.Millisecond, stats, log)
|
||||
|
||||
go queue.loop()
|
||||
queue.enqueue(serial(t), time.Now(), ocsp.ResponseStatus(ocsp.Good))
|
||||
// This gets a little tricky: Each iteration of the `select` in
|
||||
// queue.loop() is a race between the channel that receives log
|
||||
// events and the `<-clk.After(n)` timer. Even if we used
|
||||
// a fake clock, our loop here would often win that race, producing
|
||||
// inconsistent logging results. For instance, it would be entirely
|
||||
// possible for all of these `enqueues` to win the race, putting
|
||||
// all log entries on one line.
|
||||
// To avoid that, sleep using the wall clock for 50ms.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
expected := []string{
|
||||
"INFO: [AUDIT] OCSP signed: aabbccddeeffaabbccddeeff000102030405:0,",
|
||||
}
|
||||
test.AssertDeepEquals(t, log.GetAll(), expected)
|
||||
queue.stop()
|
||||
}
|
||||
|
||||
// If the deadline passes and nothing has been logged, we should not log a blank line.
|
||||
func TestOcspNoEmptyLines(t *testing.T) {
|
||||
t.Parallel()
|
||||
log := blog.NewMock()
|
||||
stats := metrics.NoopRegisterer
|
||||
queue := newOCSPLogQueue(90000, 10*time.Millisecond, stats, log)
|
||||
|
||||
go queue.loop()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
queue.stop()
|
||||
|
||||
test.AssertDeepEquals(t, log.GetAll(), []string{})
|
||||
}
|
||||
|
||||
// If the maxLogLen is 0, don't log anything.
|
||||
func TestOcspLogMaxLenZeroMeansNoLog(t *testing.T) {
|
||||
t.Parallel()
|
||||
log := blog.NewMock()
|
||||
stats := metrics.NoopRegisterer
|
||||
queue := newOCSPLogQueue(0, 10000*time.Millisecond, stats, log)
|
||||
go queue.loop()
|
||||
queue.enqueue(serial(t), time.Now(), ocsp.ResponseStatus(ocsp.Good))
|
||||
queue.stop()
|
||||
|
||||
test.AssertDeepEquals(t, log.GetAll(), []string{})
|
||||
}
|
||||
|
||||
// Enqueueing entries after stop causes panic.
|
||||
func TestOcspLogPanicsOnEnqueueAfterStop(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
log := blog.NewMock()
|
||||
stats := metrics.NoopRegisterer
|
||||
queue := newOCSPLogQueue(4000, 10000*time.Millisecond, stats, log)
|
||||
go queue.loop()
|
||||
queue.stop()
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("The code did not panic")
|
||||
}
|
||||
}()
|
||||
|
||||
queue.enqueue(serial(t), time.Now(), ocsp.ResponseStatus(ocsp.Good))
|
||||
}
|
|
@ -89,6 +89,20 @@ type config struct {
|
|||
// is not used.
|
||||
OrphanQueueDir string
|
||||
|
||||
// Maximum length (in bytes) of a line accumulating OCSP audit log entries.
|
||||
// Recommended to be around 4000. If this is 0, do not perform OCSP audit
|
||||
// logging.
|
||||
OCSPLogMaxLength int
|
||||
|
||||
// Maximum period (in Go duration format) to wait to accumulate a max-length
|
||||
// OCSP audit log line. We will emit a log line at least once per period,
|
||||
// if there is anything to be logged. Keeping this low minimizes the risk
|
||||
// of losing logs during a catastrophic failure. Making it too high
|
||||
// means logging more often than necessary, which is inefficient in terms
|
||||
// of bytes and log system resources.
|
||||
// Recommended to be around 500ms.
|
||||
OCSPLogPeriod cmd.ConfigDuration
|
||||
|
||||
Features map[string]bool
|
||||
}
|
||||
|
||||
|
@ -302,6 +316,8 @@ func main() {
|
|||
c.CA.LifespanOCSP.Duration,
|
||||
kp,
|
||||
orphanQueue,
|
||||
c.CA.OCSPLogMaxLength,
|
||||
c.CA.OCSPLogPeriod.Duration,
|
||||
logger,
|
||||
scope,
|
||||
clk)
|
||||
|
@ -310,6 +326,7 @@ func main() {
|
|||
if orphanQueue != nil {
|
||||
go cai.OrphanIntegrationLoop()
|
||||
}
|
||||
go cai.LogOCSPLoop()
|
||||
|
||||
serverMetrics := bgrpc.NewServerMetrics(scope)
|
||||
|
||||
|
@ -339,6 +356,7 @@ func main() {
|
|||
ocspHealth.Shutdown()
|
||||
caSrv.GracefulStop()
|
||||
ocspSrv.GracefulStop()
|
||||
cai.Stop()
|
||||
})
|
||||
|
||||
select {}
|
||||
|
|
|
@ -85,6 +85,8 @@
|
|||
"weakKeyFile": "test/example-weak-keys.json",
|
||||
"blockedKeyFile": "test/example-blocked-keys.yaml",
|
||||
"orphanQueueDir": "/tmp/orphaned-certificates-a",
|
||||
"ocspLogMaxLength": 4000,
|
||||
"ocspLogPeriod": "500ms",
|
||||
"features": {
|
||||
"NonCFSSLSigner": true,
|
||||
"StoreIssuerInfo": true
|
||||
|
|
|
@ -85,6 +85,8 @@
|
|||
"weakKeyFile": "test/example-weak-keys.json",
|
||||
"blockedKeyFile": "test/example-blocked-keys.yaml",
|
||||
"orphanQueueDir": "/tmp/orphaned-certificates-b",
|
||||
"ocspLogMaxLength": 4000,
|
||||
"ocspLogPeriod": "500ms",
|
||||
"features": {
|
||||
"NonCFSSLSigner": true,
|
||||
"StoreIssuerInfo": true
|
||||
|
|
Loading…
Reference in New Issue