crl-updater: query by explicit shard too (#7973)

Add querying by explicit shard (SA.GetRevokedCertsByShard) in addition
to querying by temporal shard (SA.GetRevokedCerts).

Merge results from both kinds of shard. De-duplicate by serial within a
shard, because the same certificate could wind up in a temporal shard
that matches its explicit shard.

When de-duplicating, validate that revocation reasons are the same or
(very unlikely) represent a re-revocation based on demonstrating key
compromise. This can happen because the two different SA queries occur
at slightly different times.

Add unit testing that CRL entries make it through the whole pipeline
from SA, to CA, to uploader.

Rename some types in the unittest to be more accessible.

Tweak a comment in SA.UpdateRevokedCertificate to make it clear that
status _and_ reason are critical for re-revocation.

Note: This GetRevokedCertsByShard code path will always return zero
certificates right now, because nothing is writing to the
`revokedCertificates` table. Writing to that table is gated on
certificates having CRL URLs in them, which is not yet implemented (and
will be config-gated).

Part of #7094
This commit is contained in:
Jacob Hoffman-Andrews 2025-01-27 10:11:09 -08:00 committed by GitHub
parent 3fcaebe934
commit e0221b6bbe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 410 additions and 65 deletions

View File

@ -26,9 +26,9 @@ func TestRunOnce(t *testing.T) {
[]*issuance.Certificate{e1, r3}, []*issuance.Certificate{e1, r3},
2, 18*time.Hour, 24*time.Hour, 2, 18*time.Hour, 24*time.Hour,
6*time.Hour, time.Minute, 1, 1, 6*time.Hour, time.Minute, 1, 1,
&fakeSAC{grcc: fakeGRCC{err: errors.New("db no worky")}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour)}, &fakeSAC{revokedCerts: revokedCertsStream{err: errors.New("db no worky")}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour)},
&fakeCGC{gcc: fakeGCC{}}, &fakeCA{gcc: generateCRLStream{}},
&fakeCSC{ucc: fakeUCC{}}, &fakeStorer{uploaderStream: &noopUploader{}},
metrics.NoopRegisterer, mockLog, clk, metrics.NoopRegisterer, mockLog, clk,
) )
test.AssertNotError(t, err, "building test crlUpdater") test.AssertNotError(t, err, "building test crlUpdater")

View File

@ -11,6 +11,7 @@ import (
"github.com/jmhodges/clock" "github.com/jmhodges/clock"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"golang.org/x/crypto/ocsp"
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
@ -183,11 +184,72 @@ func (cu *crlUpdater) updateShardWithRetry(ctx context.Context, atTime time.Time
return nil return nil
} }
type crlStream interface {
Recv() (*proto.CRLEntry, error)
}
// reRevoked returns the later of the two entries, only if the latter represents a valid
// re-revocation of the former (reason == KeyCompromise).
func reRevoked(a *proto.CRLEntry, b *proto.CRLEntry) (*proto.CRLEntry, error) {
first, second := a, b
if b.RevokedAt.AsTime().Before(a.RevokedAt.AsTime()) {
first, second = b, a
}
if first.Reason != ocsp.KeyCompromise && second.Reason == ocsp.KeyCompromise {
return second, nil
}
// The RA has logic to prevent re-revocation for any reason other than KeyCompromise,
// so this should be impossible. The best we can do is error out.
return nil, fmt.Errorf("certificate %s was revoked with reason %d at %s and re-revoked with invalid reason %d at %s",
first.Serial, first.Reason, first.RevokedAt.AsTime(), second.Reason, second.RevokedAt.AsTime())
}
// addFromStream pulls `proto.CRLEntry` objects from a stream, adding them to the crlEntries map.
//
// Consolidates duplicates and checks for internal consistency of the results.
//
// Returns the number of entries received from the stream, regardless of duplicate status.
func addFromStream(crlEntries map[string]*proto.CRLEntry, stream crlStream) (int, error) {
var count int
for {
entry, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
return 0, fmt.Errorf("retrieving entry from SA: %w", err)
}
count++
previousEntry := crlEntries[entry.Serial]
if previousEntry == nil {
crlEntries[entry.Serial] = entry
continue
}
if previousEntry.Reason == entry.Reason &&
previousEntry.RevokedAt.AsTime().Equal(entry.RevokedAt.AsTime()) {
continue
}
// There's a tiny possibility a certificate was re-revoked for KeyCompromise and
// we got a different view of it from temporal sharding vs explicit sharding.
// Prefer the re-revoked CRL entry, which must be the one with KeyCompromise.
second, err := reRevoked(entry, previousEntry)
if err != nil {
return 0, err
}
crlEntries[entry.Serial] = second
}
return count, nil
}
// updateShard processes a single shard. It computes the shard's boundaries, gets // updateShard processes a single shard. It computes the shard's boundaries, gets
// the list of revoked certs in that shard from the SA, gets the CA to sign the // the list of revoked certs in that shard from the SA, gets the CA to sign the
// resulting CRL, and gets the crl-storer to upload it. It returns an error if // resulting CRL, and gets the crl-storer to upload it. It returns an error if
// any of these operations fail. // any of these operations fail.
func (cu *crlUpdater) updateShard(ctx context.Context, atTime time.Time, issuerNameID issuance.NameID, shardIdx int, chunks []chunk) (err error) { func (cu *crlUpdater) updateShard(ctx context.Context, atTime time.Time, issuerNameID issuance.NameID, shardIdx int, chunks []chunk) (err error) {
if shardIdx <= 0 {
return fmt.Errorf("invalid shard %d", shardIdx)
}
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@ -207,8 +269,10 @@ func (cu *crlUpdater) updateShard(ctx context.Context, atTime time.Time, issuerN
cu.log.Infof( cu.log.Infof(
"Generating CRL shard: id=[%s] numChunks=[%d]", crlID, len(chunks)) "Generating CRL shard: id=[%s] numChunks=[%d]", crlID, len(chunks))
// Get the full list of CRL Entries for this shard from the SA. // Deduplicate the CRL entries by serial number, since we can get the same certificate via
var crlEntries []*proto.CRLEntry // both temporal sharding (GetRevokedCerts) and explicit sharding (GetRevokedCertsByShard).
crlEntries := make(map[string]*proto.CRLEntry)
for _, chunk := range chunks { for _, chunk := range chunks {
saStream, err := cu.sa.GetRevokedCerts(ctx, &sapb.GetRevokedCertsRequest{ saStream, err := cu.sa.GetRevokedCerts(ctx, &sapb.GetRevokedCertsRequest{
IssuerNameID: int64(issuerNameID), IssuerNameID: int64(issuerNameID),
@ -217,25 +281,41 @@ func (cu *crlUpdater) updateShard(ctx context.Context, atTime time.Time, issuerN
RevokedBefore: timestamppb.New(atTime), RevokedBefore: timestamppb.New(atTime),
}) })
if err != nil { if err != nil {
return fmt.Errorf("connecting to SA: %w", err) return fmt.Errorf("GetRevokedCerts: %w", err)
} }
for { n, err := addFromStream(crlEntries, saStream)
entry, err := saStream.Recv() if err != nil {
if err != nil { return fmt.Errorf("streaming GetRevokedCerts: %w", err)
if err == io.EOF {
break
}
return fmt.Errorf("retrieving entry from SA: %w", err)
}
crlEntries = append(crlEntries, entry)
} }
cu.log.Infof( cu.log.Infof(
"Queried SA for CRL shard: id=[%s] expiresAfter=[%s] expiresBefore=[%s] numEntries=[%d]", "Queried SA for CRL shard: id=[%s] expiresAfter=[%s] expiresBefore=[%s] numEntries=[%d]",
crlID, chunk.start, chunk.end, len(crlEntries)) crlID, chunk.start, chunk.end, n)
} }
// Query for unexpired certificates, with padding to ensure that revoked certificates show
// up in at least one CRL, even if they expire between revocation and CRL generation.
expiresAfter := cu.clk.Now().Add(cu.lookbackPeriod)
saStream, err := cu.sa.GetRevokedCertsByShard(ctx, &sapb.GetRevokedCertsByShardRequest{
IssuerNameID: int64(issuerNameID),
ShardIdx: int64(shardIdx),
ExpiresAfter: timestamppb.New(expiresAfter),
RevokedBefore: timestamppb.New(atTime),
})
if err != nil {
return fmt.Errorf("GetRevokedCertsByShard: %w", err)
}
n, err := addFromStream(crlEntries, saStream)
if err != nil {
return fmt.Errorf("streaming GetRevokedCertsByShard: %w", err)
}
cu.log.Infof(
"Queried SA by CRL shard number: id=[%s] shardIdx=[%d] numEntries=[%d]", crlID, shardIdx, n)
// Send the full list of CRL Entries to the CA. // Send the full list of CRL Entries to the CA.
caStream, err := cu.ca.GenerateCRL(ctx) caStream, err := cu.ca.GenerateCRL(ctx)
if err != nil { if err != nil {

View File

@ -1,12 +1,16 @@
package updater package updater
import ( import (
"bytes"
"context" "context"
"encoding/json"
"errors" "errors"
"io" "io"
"reflect"
"testing" "testing"
"time" "time"
"golang.org/x/crypto/ocsp"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
@ -24,17 +28,17 @@ import (
"github.com/letsencrypt/boulder/test" "github.com/letsencrypt/boulder/test"
) )
// fakeGRCC is a fake grpc.ClientStreamingClient which can be // revokedCertsStream is a fake grpc.ClientStreamingClient which can be
// populated with some CRL entries or an error for use as the return value of // populated with some CRL entries or an error for use as the return value of
// a faked GetRevokedCerts call. // a faked GetRevokedCerts call.
type fakeGRCC struct { type revokedCertsStream struct {
grpc.ClientStream grpc.ClientStream
entries []*corepb.CRLEntry entries []*corepb.CRLEntry
nextIdx int nextIdx int
err error err error
} }
func (f *fakeGRCC) Recv() (*corepb.CRLEntry, error) { func (f *revokedCertsStream) Recv() (*corepb.CRLEntry, error) {
if f.err != nil { if f.err != nil {
return nil, f.err return nil, f.err
} }
@ -51,13 +55,22 @@ func (f *fakeGRCC) Recv() (*corepb.CRLEntry, error) {
// fake timestamp to serve as the database's maximum notAfter value. // fake timestamp to serve as the database's maximum notAfter value.
type fakeSAC struct { type fakeSAC struct {
sapb.StorageAuthorityClient sapb.StorageAuthorityClient
grcc fakeGRCC revokedCerts revokedCertsStream
maxNotAfter time.Time revokedCertsByShard revokedCertsStream
leaseError error maxNotAfter time.Time
leaseError error
} }
func (f *fakeSAC) GetRevokedCerts(ctx context.Context, _ *sapb.GetRevokedCertsRequest, _ ...grpc.CallOption) (grpc.ServerStreamingClient[corepb.CRLEntry], error) { func (f *fakeSAC) GetRevokedCerts(ctx context.Context, _ *sapb.GetRevokedCertsRequest, _ ...grpc.CallOption) (grpc.ServerStreamingClient[corepb.CRLEntry], error) {
return &f.grcc, nil return &f.revokedCerts, nil
}
// Return some configured contents, but only for shard 2.
func (f *fakeSAC) GetRevokedCertsByShard(ctx context.Context, req *sapb.GetRevokedCertsByShardRequest, _ ...grpc.CallOption) (grpc.ServerStreamingClient[corepb.CRLEntry], error) {
if req.ShardIdx == 2 {
return &f.revokedCertsByShard, nil
}
return &revokedCertsStream{}, nil
} }
func (f *fakeSAC) GetMaxExpiration(_ context.Context, req *emptypb.Empty, _ ...grpc.CallOption) (*timestamppb.Timestamp, error) { func (f *fakeSAC) GetMaxExpiration(_ context.Context, req *emptypb.Empty, _ ...grpc.CallOption) (*timestamppb.Timestamp, error) {
@ -71,10 +84,20 @@ func (f *fakeSAC) LeaseCRLShard(_ context.Context, req *sapb.LeaseCRLShardReques
return &sapb.LeaseCRLShardResponse{IssuerNameID: req.IssuerNameID, ShardIdx: req.MinShardIdx}, nil return &sapb.LeaseCRLShardResponse{IssuerNameID: req.IssuerNameID, ShardIdx: req.MinShardIdx}, nil
} }
// fakeGCC is a fake grpc.BidiStreamingClient which can be // generateCRLStream implements the streaming API returned from GenerateCRL.
// populated with some CRL entries or an error for use as the return value of //
// a faked GenerateCRL call. // Specifically it implements grpc.BidiStreamingClient.
type fakeGCC struct { //
// If it has non-nil error fields, it returns those on Send() or Recv().
//
// When it receives a CRL entry (on Send()), it records that entry internally, JSON serialized,
// with a newline between JSON objects.
//
// When it is asked for bytes of a signed CRL (Recv()), it sends those JSON serialized contents.
//
// We use JSON instead of CRL format because we're not testing the signing and formatting done
// by the CA, just the plumbing of different components together done by the crl-updater.
type generateCRLStream struct {
grpc.ClientStream grpc.ClientStream
chunks [][]byte chunks [][]byte
nextIdx int nextIdx int
@ -82,15 +105,36 @@ type fakeGCC struct {
recvErr error recvErr error
} }
func (f *fakeGCC) Send(*capb.GenerateCRLRequest) error { type crlEntry struct {
Serial string
Reason int32
RevokedAt time.Time
}
func (f *generateCRLStream) Send(req *capb.GenerateCRLRequest) error {
if f.sendErr != nil {
return f.sendErr
}
if t, ok := req.Payload.(*capb.GenerateCRLRequest_Entry); ok {
jsonBytes, err := json.Marshal(crlEntry{
Serial: t.Entry.Serial,
Reason: t.Entry.Reason,
RevokedAt: t.Entry.RevokedAt.AsTime(),
})
if err != nil {
return err
}
f.chunks = append(f.chunks, jsonBytes)
f.chunks = append(f.chunks, []byte("\n"))
}
return f.sendErr return f.sendErr
} }
func (f *fakeGCC) CloseSend() error { func (f *generateCRLStream) CloseSend() error {
return nil return nil
} }
func (f *fakeGCC) Recv() (*capb.GenerateCRLResponse, error) { func (f *generateCRLStream) Recv() (*capb.GenerateCRLResponse, error) {
if f.recvErr != nil { if f.recvErr != nil {
return nil, f.recvErr return nil, f.recvErr
} }
@ -102,43 +146,67 @@ func (f *fakeGCC) Recv() (*capb.GenerateCRLResponse, error) {
return nil, io.EOF return nil, io.EOF
} }
// fakeCGC is a fake capb.CRLGeneratorClient which can be populated with a // fakeCA acts as a fake CA (specifically implementing capb.CRLGeneratorClient).
// fakeGCC to be used as the return value for calls to GenerateCRL. //
type fakeCGC struct { // It always returns its field in response to `GenerateCRL`. Because this is a streaming
gcc fakeGCC // RPC, that return value is responsible for most of the work.
type fakeCA struct {
gcc generateCRLStream
} }
func (f *fakeCGC) GenerateCRL(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[capb.GenerateCRLRequest, capb.GenerateCRLResponse], error) { func (f *fakeCA) GenerateCRL(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[capb.GenerateCRLRequest, capb.GenerateCRLResponse], error) {
return &f.gcc, nil return &f.gcc, nil
} }
// fakeUCC is a fake grpc.ClientStreamingClient which can be populated with // recordingUploader acts as the streaming part of UploadCRL.
//
// Records all uploaded chunks in crlBody.
type recordingUploader struct {
grpc.ClientStream
crlBody []byte
}
func (r *recordingUploader) Send(req *cspb.UploadCRLRequest) error {
if t, ok := req.Payload.(*cspb.UploadCRLRequest_CrlChunk); ok {
r.crlBody = append(r.crlBody, t.CrlChunk...)
}
return nil
}
func (r *recordingUploader) CloseAndRecv() (*emptypb.Empty, error) {
return &emptypb.Empty{}, nil
}
// noopUploader is a fake grpc.ClientStreamingClient which can be populated with
// an error for use as the return value of a faked UploadCRL call. // an error for use as the return value of a faked UploadCRL call.
type fakeUCC struct { //
// It does nothing with uploaded contents.
type noopUploader struct {
grpc.ClientStream grpc.ClientStream
sendErr error sendErr error
recvErr error recvErr error
} }
func (f *fakeUCC) Send(*cspb.UploadCRLRequest) error { func (f *noopUploader) Send(*cspb.UploadCRLRequest) error {
return f.sendErr return f.sendErr
} }
func (f *fakeUCC) CloseAndRecv() (*emptypb.Empty, error) { func (f *noopUploader) CloseAndRecv() (*emptypb.Empty, error) {
if f.recvErr != nil { if f.recvErr != nil {
return nil, f.recvErr return nil, f.recvErr
} }
return &emptypb.Empty{}, nil return &emptypb.Empty{}, nil
} }
// fakeCSC is a fake cspb.CRLStorerClient which can be populated with a // fakeStorer is a fake cspb.CRLStorerClient which can be populated with an
// fakeUCC for use as the return value for calls to UploadCRL. // uploader stream for use as the return value for calls to UploadCRL.
type fakeCSC struct { type fakeStorer struct {
ucc fakeUCC uploaderStream grpc.ClientStreamingClient[cspb.UploadCRLRequest, emptypb.Empty]
} }
func (f *fakeCSC) UploadCRL(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[cspb.UploadCRLRequest, emptypb.Empty], error) { func (f *fakeStorer) UploadCRL(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[cspb.UploadCRLRequest, emptypb.Empty], error) {
return &f.ucc, nil return f.uploaderStream, nil
} }
func TestUpdateShard(t *testing.T) { func TestUpdateShard(t *testing.T) {
@ -157,9 +225,12 @@ func TestUpdateShard(t *testing.T) {
[]*issuance.Certificate{e1, r3}, []*issuance.Certificate{e1, r3},
2, 18*time.Hour, 24*time.Hour, 2, 18*time.Hour, 24*time.Hour,
6*time.Hour, time.Minute, 1, 1, 6*time.Hour, time.Minute, 1, 1,
&fakeSAC{grcc: fakeGRCC{}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour)}, &fakeSAC{
&fakeCGC{gcc: fakeGCC{}}, revokedCerts: revokedCertsStream{},
&fakeCSC{ucc: fakeUCC{}}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour),
},
&fakeCA{gcc: generateCRLStream{}},
&fakeStorer{uploaderStream: &noopUploader{}},
metrics.NoopRegisterer, blog.NewMock(), clk, metrics.NoopRegisterer, blog.NewMock(), clk,
) )
test.AssertNotError(t, err, "building test crlUpdater") test.AssertNotError(t, err, "building test crlUpdater")
@ -169,7 +240,91 @@ func TestUpdateShard(t *testing.T) {
} }
// Ensure that getting no results from the SA still works. // Ensure that getting no results from the SA still works.
err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks) err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 1, testChunks)
test.AssertNotError(t, err, "empty CRL")
test.AssertMetricWithLabelsEquals(t, cu.updatedCounter, prometheus.Labels{
"issuer": "(TEST) Elegant Elephant E1", "result": "success",
}, 1)
// Make a CRL with actual contents. Verify that the information makes it through
// each of the steps:
// - read from SA
// - write to CA and read the response
// - upload with CRL storer
//
// The final response should show up in the bytes recorded by our fake storer.
recordingUploader := &recordingUploader{}
now := timestamppb.Now()
cu.cs = &fakeStorer{uploaderStream: recordingUploader}
cu.sa = &fakeSAC{
revokedCerts: revokedCertsStream{
entries: []*corepb.CRLEntry{
{
Serial: "0311b5d430823cfa25b0fc85d14c54ee35",
Reason: int32(ocsp.KeyCompromise),
RevokedAt: now,
},
},
},
revokedCertsByShard: revokedCertsStream{
entries: []*corepb.CRLEntry{
{
Serial: "0311b5d430823cfa25b0fc85d14c54ee35",
Reason: int32(ocsp.KeyCompromise),
RevokedAt: now,
},
{
Serial: "037d6a05a0f6a975380456ae605cee9889",
Reason: int32(ocsp.AffiliationChanged),
RevokedAt: now,
},
{
Serial: "03aa617ab8ee58896ba082bfa25199c884",
Reason: int32(ocsp.Unspecified),
RevokedAt: now,
},
},
},
maxNotAfter: clk.Now().Add(90 * 24 * time.Hour),
}
// We ask for shard 2 specifically because GetRevokedCertsByShard only returns our
// certificate for that shard.
err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 2, testChunks)
test.AssertNotError(t, err, "updateShard")
expectedEntries := map[string]int32{
"0311b5d430823cfa25b0fc85d14c54ee35": int32(ocsp.KeyCompromise),
"037d6a05a0f6a975380456ae605cee9889": int32(ocsp.AffiliationChanged),
"03aa617ab8ee58896ba082bfa25199c884": int32(ocsp.Unspecified),
}
for _, r := range bytes.Split(recordingUploader.crlBody, []byte("\n")) {
if len(r) == 0 {
continue
}
var entry crlEntry
err := json.Unmarshal(r, &entry)
if err != nil {
t.Fatalf("unmarshaling JSON: %s", err)
}
expectedReason, ok := expectedEntries[entry.Serial]
if !ok {
t.Errorf("CRL entry for %s was unexpected", entry.Serial)
}
if entry.Reason != expectedReason {
t.Errorf("CRL entry for %s had reason=%d, want %d", entry.Serial, entry.Reason, expectedReason)
}
delete(expectedEntries, entry.Serial)
}
// At this point the expectedEntries map should be empty; if it's not, emit an error
// for each remaining expectation.
for k, v := range expectedEntries {
t.Errorf("expected cert %s to be revoked for reason=%d, but it was not on the CRL", k, v)
}
cu.updatedCounter.Reset()
// Ensure that getting no results from the SA still works.
err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 1, testChunks)
test.AssertNotError(t, err, "empty CRL") test.AssertNotError(t, err, "empty CRL")
test.AssertMetricWithLabelsEquals(t, cu.updatedCounter, prometheus.Labels{ test.AssertMetricWithLabelsEquals(t, cu.updatedCounter, prometheus.Labels{
"issuer": "(TEST) Elegant Elephant E1", "result": "success", "issuer": "(TEST) Elegant Elephant E1", "result": "success",
@ -177,8 +332,8 @@ func TestUpdateShard(t *testing.T) {
cu.updatedCounter.Reset() cu.updatedCounter.Reset()
// Errors closing the Storer upload stream should bubble up. // Errors closing the Storer upload stream should bubble up.
cu.cs = &fakeCSC{ucc: fakeUCC{recvErr: sentinelErr}} cu.cs = &fakeStorer{uploaderStream: &noopUploader{recvErr: sentinelErr}}
err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks) err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 1, testChunks)
test.AssertError(t, err, "storer error") test.AssertError(t, err, "storer error")
test.AssertContains(t, err.Error(), "closing CRLStorer upload stream") test.AssertContains(t, err.Error(), "closing CRLStorer upload stream")
test.AssertErrorIs(t, err, sentinelErr) test.AssertErrorIs(t, err, sentinelErr)
@ -188,8 +343,8 @@ func TestUpdateShard(t *testing.T) {
cu.updatedCounter.Reset() cu.updatedCounter.Reset()
// Errors sending to the Storer should bubble up sooner. // Errors sending to the Storer should bubble up sooner.
cu.cs = &fakeCSC{ucc: fakeUCC{sendErr: sentinelErr}} cu.cs = &fakeStorer{uploaderStream: &noopUploader{sendErr: sentinelErr}}
err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks) err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 1, testChunks)
test.AssertError(t, err, "storer error") test.AssertError(t, err, "storer error")
test.AssertContains(t, err.Error(), "sending CRLStorer metadata") test.AssertContains(t, err.Error(), "sending CRLStorer metadata")
test.AssertErrorIs(t, err, sentinelErr) test.AssertErrorIs(t, err, sentinelErr)
@ -199,8 +354,8 @@ func TestUpdateShard(t *testing.T) {
cu.updatedCounter.Reset() cu.updatedCounter.Reset()
// Errors reading from the CA should bubble up sooner. // Errors reading from the CA should bubble up sooner.
cu.ca = &fakeCGC{gcc: fakeGCC{recvErr: sentinelErr}} cu.ca = &fakeCA{gcc: generateCRLStream{recvErr: sentinelErr}}
err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks) err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 1, testChunks)
test.AssertError(t, err, "CA error") test.AssertError(t, err, "CA error")
test.AssertContains(t, err.Error(), "receiving CRL bytes") test.AssertContains(t, err.Error(), "receiving CRL bytes")
test.AssertErrorIs(t, err, sentinelErr) test.AssertErrorIs(t, err, sentinelErr)
@ -210,8 +365,8 @@ func TestUpdateShard(t *testing.T) {
cu.updatedCounter.Reset() cu.updatedCounter.Reset()
// Errors sending to the CA should bubble up sooner. // Errors sending to the CA should bubble up sooner.
cu.ca = &fakeCGC{gcc: fakeGCC{sendErr: sentinelErr}} cu.ca = &fakeCA{gcc: generateCRLStream{sendErr: sentinelErr}}
err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks) err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 1, testChunks)
test.AssertError(t, err, "CA error") test.AssertError(t, err, "CA error")
test.AssertContains(t, err.Error(), "sending CA metadata") test.AssertContains(t, err.Error(), "sending CA metadata")
test.AssertErrorIs(t, err, sentinelErr) test.AssertErrorIs(t, err, sentinelErr)
@ -221,8 +376,8 @@ func TestUpdateShard(t *testing.T) {
cu.updatedCounter.Reset() cu.updatedCounter.Reset()
// Errors reading from the SA should bubble up soonest. // Errors reading from the SA should bubble up soonest.
cu.sa = &fakeSAC{grcc: fakeGRCC{err: sentinelErr}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour)} cu.sa = &fakeSAC{revokedCerts: revokedCertsStream{err: sentinelErr}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour)}
err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks) err = cu.updateShard(ctx, cu.clk.Now(), e1.NameID(), 1, testChunks)
test.AssertError(t, err, "database error") test.AssertError(t, err, "database error")
test.AssertContains(t, err.Error(), "retrieving entry from SA") test.AssertContains(t, err.Error(), "retrieving entry from SA")
test.AssertErrorIs(t, err, sentinelErr) test.AssertErrorIs(t, err, sentinelErr)
@ -250,9 +405,9 @@ func TestUpdateShardWithRetry(t *testing.T) {
[]*issuance.Certificate{e1, r3}, []*issuance.Certificate{e1, r3},
2, 18*time.Hour, 24*time.Hour, 2, 18*time.Hour, 24*time.Hour,
6*time.Hour, time.Minute, 1, 1, 6*time.Hour, time.Minute, 1, 1,
&fakeSAC{grcc: fakeGRCC{err: sentinelErr}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour)}, &fakeSAC{revokedCerts: revokedCertsStream{err: sentinelErr}, maxNotAfter: clk.Now().Add(90 * 24 * time.Hour)},
&fakeCGC{gcc: fakeGCC{}}, &fakeCA{gcc: generateCRLStream{}},
&fakeCSC{ucc: fakeUCC{}}, &fakeStorer{uploaderStream: &noopUploader{}},
metrics.NoopRegisterer, blog.NewMock(), clk, metrics.NoopRegisterer, blog.NewMock(), clk,
) )
test.AssertNotError(t, err, "building test crlUpdater") test.AssertNotError(t, err, "building test crlUpdater")
@ -264,7 +419,7 @@ func TestUpdateShardWithRetry(t *testing.T) {
// Ensure that having MaxAttempts set to 1 results in the clock not moving // Ensure that having MaxAttempts set to 1 results in the clock not moving
// forward at all. // forward at all.
startTime := cu.clk.Now() startTime := cu.clk.Now()
err = cu.updateShardWithRetry(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks) err = cu.updateShardWithRetry(ctx, cu.clk.Now(), e1.NameID(), 1, testChunks)
test.AssertError(t, err, "database error") test.AssertError(t, err, "database error")
test.AssertErrorIs(t, err, sentinelErr) test.AssertErrorIs(t, err, sentinelErr)
test.AssertEquals(t, cu.clk.Now(), startTime) test.AssertEquals(t, cu.clk.Now(), startTime)
@ -274,7 +429,7 @@ func TestUpdateShardWithRetry(t *testing.T) {
// in, so we have to be approximate. // in, so we have to be approximate.
cu.maxAttempts = 5 cu.maxAttempts = 5
startTime = cu.clk.Now() startTime = cu.clk.Now()
err = cu.updateShardWithRetry(ctx, cu.clk.Now(), e1.NameID(), 0, testChunks) err = cu.updateShardWithRetry(ctx, cu.clk.Now(), e1.NameID(), 1, testChunks)
test.AssertError(t, err, "database error") test.AssertError(t, err, "database error")
test.AssertErrorIs(t, err, sentinelErr) test.AssertErrorIs(t, err, sentinelErr)
t.Logf("start: %v", startTime) t.Logf("start: %v", startTime)
@ -399,3 +554,113 @@ func TestGetChunkAtTime(t *testing.T) {
_, err = GetChunkAtTime(shardWidth, numShards, atTime) _, err = GetChunkAtTime(shardWidth, numShards, atTime)
test.AssertError(t, err, "getting far-future chunk") test.AssertError(t, err, "getting far-future chunk")
} }
func TestAddFromStream(t *testing.T) {
now := time.Now()
yesterday := now.Add(-24 * time.Hour)
simpleEntry := &corepb.CRLEntry{
Serial: "abcdefg",
Reason: ocsp.CessationOfOperation,
RevokedAt: timestamppb.New(yesterday),
}
reRevokedEntry := &corepb.CRLEntry{
Serial: "abcdefg",
Reason: ocsp.KeyCompromise,
RevokedAt: timestamppb.New(now),
}
reRevokedEntryOld := &corepb.CRLEntry{
Serial: "abcdefg",
Reason: ocsp.KeyCompromise,
RevokedAt: timestamppb.New(now.Add(-48 * time.Hour)),
}
reRevokedEntryBadReason := &corepb.CRLEntry{
Serial: "abcdefg",
Reason: ocsp.AffiliationChanged,
RevokedAt: timestamppb.New(now),
}
type testCase struct {
name string
inputs [][]*corepb.CRLEntry
expected map[string]*corepb.CRLEntry
expectErr bool
}
testCases := []testCase{
{
name: "two streams with same entry",
inputs: [][]*corepb.CRLEntry{
{simpleEntry},
{simpleEntry},
},
expected: map[string]*corepb.CRLEntry{
simpleEntry.Serial: simpleEntry,
},
},
{
name: "re-revoked",
inputs: [][]*corepb.CRLEntry{
{simpleEntry},
{simpleEntry, reRevokedEntry},
},
expected: map[string]*corepb.CRLEntry{
simpleEntry.Serial: reRevokedEntry,
},
},
{
name: "re-revoked (newer shows up first)",
inputs: [][]*corepb.CRLEntry{
{reRevokedEntry, simpleEntry},
{simpleEntry},
},
expected: map[string]*corepb.CRLEntry{
simpleEntry.Serial: reRevokedEntry,
},
},
{
name: "re-revoked (wrong date)",
inputs: [][]*corepb.CRLEntry{
{simpleEntry},
{simpleEntry, reRevokedEntryOld},
},
expectErr: true,
},
{
name: "re-revoked (wrong reason)",
inputs: [][]*corepb.CRLEntry{
{simpleEntry},
{simpleEntry, reRevokedEntryBadReason},
},
expectErr: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
crlEntries := make(map[string]*corepb.CRLEntry)
var err error
for _, input := range tc.inputs {
_, err = addFromStream(crlEntries, &revokedCertsStream{entries: input})
if err != nil {
break
}
}
if tc.expectErr {
if err == nil {
t.Errorf("addFromStream=%+v, want error", crlEntries)
}
} else {
if err != nil {
t.Fatalf("addFromStream=%s, want no error", err)
}
if !reflect.DeepEqual(crlEntries, tc.expected) {
t.Errorf("addFromStream=%+v, want %+v", crlEntries, tc.expected)
}
}
})
}
}

View File

@ -1004,7 +1004,7 @@ func (ssa *SQLStorageAuthority) UpdateRevokedCertificate(ctx context.Context, re
// the "UPDATE certificateStatus SET revokedReason..." above if this // the "UPDATE certificateStatus SET revokedReason..." above if this
// query ever becomes the first or only query in this transaction. We are // query ever becomes the first or only query in this transaction. We are
// currently relying on the query above to exit early if the certificate // currently relying on the query above to exit early if the certificate
// does not have an appropriate status. // does not have an appropriate status and revocation reason.
err = tx.SelectOne( err = tx.SelectOne(
ctx, &rcm, `SELECT * FROM revokedCertificates WHERE serial = ?`, req.Serial) ctx, &rcm, `SELECT * FROM revokedCertificates WHERE serial = ?`, req.Serial)
if db.IsNoRows(err) { if db.IsNoRows(err) {