SA: gRPC methods for leasing CRL shards (#6940)
Add two new methods, LeaseCRLShard and UpdateCRLShard, to the SA gRPC interface. These methods work in concert both to prevent multiple instances of crl-updater from stepping on each others toes, and to lay the groundwork for a less bursty version of crl-updater in the future. Introduce a new database table, crlShards, which tracks the thisUpdate and nextUpdate timestamps of each CRL shard for each issuer. It also has a column "leasedUntil", which is also a timestamp. Grant the SA user read-write access to this table. LeaseCRLShard updates the leasedUntil column of the identified shard to the given time. It returns an error if the identified shard's leasedUntil timestamp is already in the future. This provides a mechanism for crl-updater instances to "lick the cookie", so to speak, marking CRL shards as "taken" so that multiple crl-updater instances don't attempt to work on the same shard at the same time. Using a timestamp has the added benefit that leases are guaranteed to expire, ensuring that we don't accidentally fail to work on a shard forever. LeaseCRLShard has a second mode of operation, when a range of potential shards is given in the request, rather than a single shard. In this mode, it returns the shard (within the given range) whose thisUpdate timestamp is oldest. (Shards with no thisUpdate timestamp, including because the requested range includes shard indices the database doesn't yet know about, count as older than any shard with any thisUpdate timestamp.) This allows crl-updater instances which don't care which shard they're working on to do the most urgent work first. UpdateCRLShard updates the thisUpdate and nextUpdate timestamps of the identified shard. This closes the loop with the second mode of LeaseCRLShard above: by updating the thisUpdate timestamp, the method marks the shard as no longer urgently needing to be worked on. IN-9220 tracks creating this table in staging and production Part of #6897
This commit is contained in:
parent
f6a005bc25
commit
3d80d8505e
|
|
@ -245,7 +245,7 @@ func (sa *StorageAuthorityReadOnly) GetCertificateStatus(_ context.Context, req
|
|||
Status: string(core.OCSPStatusRevoked),
|
||||
}, nil
|
||||
} else {
|
||||
return nil, errors.New("No cert status")
|
||||
return nil, errors.New("no cert status")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -355,7 +355,6 @@ func (sa *StorageAuthority) DeactivateRegistration(_ context.Context, _ *sapb.Re
|
|||
|
||||
// NewOrderAndAuthzs is a mock
|
||||
func (sa *StorageAuthority) NewOrderAndAuthzs(_ context.Context, req *sapb.NewOrderAndAuthzsRequest, _ ...grpc.CallOption) (*corepb.Order, error) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
response := &corepb.Order{
|
||||
// Fields from the input new order request.
|
||||
RegistrationID: req.NewOrder.RegistrationID,
|
||||
|
|
@ -556,7 +555,7 @@ func (sa *StorageAuthorityReadOnly) GetAuthorization2(ctx context.Context, id *s
|
|||
authz.ID = fmt.Sprintf("%d", authzIdExpired)
|
||||
return bgrpc.AuthzToPB(authz)
|
||||
case authzIdErrorResult:
|
||||
return nil, fmt.Errorf("Unspecified database error")
|
||||
return nil, fmt.Errorf("unspecified database error")
|
||||
case authzIdDiffAccount:
|
||||
exp := sa.clk.Now().AddDate(100, 0, 0)
|
||||
authz.RegistrationID = 2
|
||||
|
|
@ -593,6 +592,16 @@ func (sa *StorageAuthorityReadOnly) IncidentsForSerial(ctx context.Context, req
|
|||
return &sapb.Incidents{}, nil
|
||||
}
|
||||
|
||||
// LeaseCRLShard is a mock.
|
||||
func (sa *StorageAuthority) LeaseCRLShard(ctx context.Context, req *sapb.LeaseCRLShardRequest, _ ...grpc.CallOption) (*sapb.LeaseCRLShardResponse, error) {
|
||||
return nil, errors.New("unimplemented")
|
||||
}
|
||||
|
||||
// UpdateCRLShard is a mock.
|
||||
func (sa *StorageAuthority) UpdateCRLShard(ctx context.Context, req *sapb.UpdateCRLShardRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) {
|
||||
return nil, errors.New("unimplemented")
|
||||
}
|
||||
|
||||
// Publisher is a mock
|
||||
type PublisherClient struct {
|
||||
// empty
|
||||
|
|
|
|||
|
|
@ -281,6 +281,7 @@ func initTables(dbMap *gorp.DbMap) {
|
|||
dbMap.AddTableWithName(keyHashModel{}, "keyHashToSerial").SetKeys(true, "ID")
|
||||
dbMap.AddTableWithName(incidentModel{}, "incidents").SetKeys(true, "ID")
|
||||
dbMap.AddTable(incidentSerialModel{})
|
||||
dbMap.AddTableWithName(crlShardModel{}, "crlShards").SetKeys(true, "ID")
|
||||
|
||||
// Read-only maps used for selecting subsets of columns.
|
||||
dbMap.AddTableWithName(CertStatusMetadata{}, "certificateStatus")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,18 @@
|
|||
-- +migrate Up
|
||||
-- SQL in section 'Up' is executed when this migration is applied
|
||||
|
||||
CREATE TABLE `crlShards` (
|
||||
`id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT,
|
||||
`issuerID` bigint(20) NOT NULL,
|
||||
`idx` int UNSIGNED NOT NULL,
|
||||
`thisUpdate` datetime,
|
||||
`nextUpdate` datetime,
|
||||
`leasedUntil` datetime NOT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `shardID` (`issuerID`, `idx`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
-- +migrate Down
|
||||
-- SQL section 'Down' is executed when this migration is rolled back
|
||||
|
||||
DROP TABLE `crlShards`;
|
||||
|
|
@ -32,6 +32,7 @@ GRANT SELECT,INSERT ON keyHashToSerial TO 'sa'@'localhost';
|
|||
GRANT SELECT,INSERT ON blockedKeys TO 'sa'@'localhost';
|
||||
GRANT SELECT,INSERT,UPDATE ON newOrdersRL TO 'sa'@'localhost';
|
||||
GRANT SELECT ON incidents TO 'sa'@'localhost';
|
||||
GRANT SELECT,INSERT,UPDATE ON crlShards TO 'sa'@'localhost';
|
||||
|
||||
GRANT SELECT ON certificates TO 'sa_ro'@'localhost';
|
||||
GRANT SELECT ON certificateStatus TO 'sa_ro'@'localhost';
|
||||
|
|
@ -50,6 +51,7 @@ GRANT SELECT ON keyHashToSerial TO 'sa_ro'@'localhost';
|
|||
GRANT SELECT ON blockedKeys TO 'sa_ro'@'localhost';
|
||||
GRANT SELECT ON newOrdersRL TO 'sa_ro'@'localhost';
|
||||
GRANT SELECT ON incidents TO 'sa_ro'@'localhost';
|
||||
GRANT SELECT ON crlShards TO 'sa_ro'@'localhost';
|
||||
|
||||
-- OCSP Responder
|
||||
GRANT SELECT ON certificateStatus TO 'ocsp_resp'@'localhost';
|
||||
|
|
|
|||
11
sa/model.go
11
sa/model.go
|
|
@ -1168,3 +1168,14 @@ func namesForOrder(s db.Selector, orderID int64) ([]string, error) {
|
|||
}
|
||||
return reversedNames, nil
|
||||
}
|
||||
|
||||
// crlShardModel represents one row in the crlShards table. The ThisUpdate and
|
||||
// NextUpdate fields are pointers because they are NULL-able columns.
|
||||
type crlShardModel struct {
|
||||
ID int64 `db:"id"`
|
||||
IssuerID int64 `db:"issuerID"`
|
||||
Idx int `db:"idx"`
|
||||
ThisUpdate *time.Time `db:"thisUpdate"`
|
||||
NextUpdate *time.Time `db:"nextUpdate"`
|
||||
LeasedUntil time.Time `db:"leasedUntil"`
|
||||
}
|
||||
|
|
|
|||
1254
sa/proto/sa.pb.go
1254
sa/proto/sa.pb.go
File diff suppressed because it is too large
Load Diff
|
|
@ -87,6 +87,8 @@ service StorageAuthority {
|
|||
rpc SetOrderProcessing(OrderRequest) returns (google.protobuf.Empty) {}
|
||||
rpc UpdateRegistration(core.Registration) returns (google.protobuf.Empty) {}
|
||||
rpc UpdateRevokedCertificate(RevokeCertificateRequest) returns (google.protobuf.Empty) {}
|
||||
rpc LeaseCRLShard(LeaseCRLShardRequest) returns (LeaseCRLShardResponse) {}
|
||||
rpc UpdateCRLShard(UpdateCRLShardRequest) returns (google.protobuf.Empty) {}
|
||||
}
|
||||
|
||||
message RegistrationID {
|
||||
|
|
@ -349,3 +351,22 @@ message RevocationStatus {
|
|||
int64 revokedReason = 2;
|
||||
google.protobuf.Timestamp revokedDate = 3; // Unix timestamp (nanoseconds)
|
||||
}
|
||||
|
||||
message LeaseCRLShardRequest {
|
||||
int64 issuerNameID = 1;
|
||||
int64 minShardIdx = 2;
|
||||
int64 maxShardIdx = 3;
|
||||
google.protobuf.Timestamp until = 4;
|
||||
}
|
||||
|
||||
message LeaseCRLShardResponse {
|
||||
int64 issuerNameID = 1;
|
||||
int64 shardIdx = 2;
|
||||
}
|
||||
|
||||
message UpdateCRLShardRequest {
|
||||
int64 issuerNameID = 1;
|
||||
int64 shardIdx = 2;
|
||||
google.protobuf.Timestamp thisUpdate = 3;
|
||||
google.protobuf.Timestamp nextUpdate = 4;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1185,6 +1185,8 @@ type StorageAuthorityClient interface {
|
|||
SetOrderProcessing(ctx context.Context, in *OrderRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
|
||||
UpdateRegistration(ctx context.Context, in *proto.Registration, opts ...grpc.CallOption) (*emptypb.Empty, error)
|
||||
UpdateRevokedCertificate(ctx context.Context, in *RevokeCertificateRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
|
||||
LeaseCRLShard(ctx context.Context, in *LeaseCRLShardRequest, opts ...grpc.CallOption) (*LeaseCRLShardResponse, error)
|
||||
UpdateCRLShard(ctx context.Context, in *UpdateCRLShardRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
|
||||
}
|
||||
|
||||
type storageAuthorityClient struct {
|
||||
|
|
@ -1637,6 +1639,24 @@ func (c *storageAuthorityClient) UpdateRevokedCertificate(ctx context.Context, i
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func (c *storageAuthorityClient) LeaseCRLShard(ctx context.Context, in *LeaseCRLShardRequest, opts ...grpc.CallOption) (*LeaseCRLShardResponse, error) {
|
||||
out := new(LeaseCRLShardResponse)
|
||||
err := c.cc.Invoke(ctx, "/sa.StorageAuthority/LeaseCRLShard", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *storageAuthorityClient) UpdateCRLShard(ctx context.Context, in *UpdateCRLShardRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
|
||||
out := new(emptypb.Empty)
|
||||
err := c.cc.Invoke(ctx, "/sa.StorageAuthority/UpdateCRLShard", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// StorageAuthorityServer is the server API for StorageAuthority service.
|
||||
// All implementations must embed UnimplementedStorageAuthorityServer
|
||||
// for forward compatibility
|
||||
|
|
@ -1687,6 +1707,8 @@ type StorageAuthorityServer interface {
|
|||
SetOrderProcessing(context.Context, *OrderRequest) (*emptypb.Empty, error)
|
||||
UpdateRegistration(context.Context, *proto.Registration) (*emptypb.Empty, error)
|
||||
UpdateRevokedCertificate(context.Context, *RevokeCertificateRequest) (*emptypb.Empty, error)
|
||||
LeaseCRLShard(context.Context, *LeaseCRLShardRequest) (*LeaseCRLShardResponse, error)
|
||||
UpdateCRLShard(context.Context, *UpdateCRLShardRequest) (*emptypb.Empty, error)
|
||||
mustEmbedUnimplementedStorageAuthorityServer()
|
||||
}
|
||||
|
||||
|
|
@ -1826,6 +1848,12 @@ func (UnimplementedStorageAuthorityServer) UpdateRegistration(context.Context, *
|
|||
func (UnimplementedStorageAuthorityServer) UpdateRevokedCertificate(context.Context, *RevokeCertificateRequest) (*emptypb.Empty, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method UpdateRevokedCertificate not implemented")
|
||||
}
|
||||
func (UnimplementedStorageAuthorityServer) LeaseCRLShard(context.Context, *LeaseCRLShardRequest) (*LeaseCRLShardResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method LeaseCRLShard not implemented")
|
||||
}
|
||||
func (UnimplementedStorageAuthorityServer) UpdateCRLShard(context.Context, *UpdateCRLShardRequest) (*emptypb.Empty, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method UpdateCRLShard not implemented")
|
||||
}
|
||||
func (UnimplementedStorageAuthorityServer) mustEmbedUnimplementedStorageAuthorityServer() {}
|
||||
|
||||
// UnsafeStorageAuthorityServer may be embedded to opt out of forward compatibility for this service.
|
||||
|
|
@ -2637,6 +2665,42 @@ func _StorageAuthority_UpdateRevokedCertificate_Handler(srv interface{}, ctx con
|
|||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _StorageAuthority_LeaseCRLShard_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(LeaseCRLShardRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(StorageAuthorityServer).LeaseCRLShard(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/sa.StorageAuthority/LeaseCRLShard",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(StorageAuthorityServer).LeaseCRLShard(ctx, req.(*LeaseCRLShardRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _StorageAuthority_UpdateCRLShard_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(UpdateCRLShardRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(StorageAuthorityServer).UpdateCRLShard(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/sa.StorageAuthority/UpdateCRLShard",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(StorageAuthorityServer).UpdateCRLShard(ctx, req.(*UpdateCRLShardRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
// StorageAuthority_ServiceDesc is the grpc.ServiceDesc for StorageAuthority service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
|
|
@ -2812,6 +2876,14 @@ var StorageAuthority_ServiceDesc = grpc.ServiceDesc{
|
|||
MethodName: "UpdateRevokedCertificate",
|
||||
Handler: _StorageAuthority_UpdateRevokedCertificate_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "LeaseCRLShard",
|
||||
Handler: _StorageAuthority_LeaseCRLShard_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "UpdateCRLShard",
|
||||
Handler: _StorageAuthority_UpdateCRLShard_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
|
|
|
|||
205
sa/sa.go
205
sa/sa.go
|
|
@ -915,3 +915,208 @@ func (ssa *SQLStorageAuthority) Health(ctx context.Context) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LeaseCRLShard marks a single crlShards row as leased until the given time.
|
||||
// If the request names a specific shard, this function will return an error
|
||||
// if that shard is already leased. Otherwise, this function will return the
|
||||
// index of the oldest shard for the given issuer.
|
||||
func (ssa *SQLStorageAuthority) LeaseCRLShard(ctx context.Context, req *sapb.LeaseCRLShardRequest) (*sapb.LeaseCRLShardResponse, error) {
|
||||
if core.IsAnyNilOrZero(req.Until, req.IssuerNameID) {
|
||||
return nil, errIncompleteRequest
|
||||
}
|
||||
if req.Until.AsTime().Before(ssa.clk.Now()) {
|
||||
return nil, fmt.Errorf("lease timestamp must be in the future, got %q", req.Until.AsTime())
|
||||
}
|
||||
|
||||
if req.MinShardIdx == req.MaxShardIdx {
|
||||
return ssa.leaseSpecificCRLShard(ctx, req)
|
||||
}
|
||||
return ssa.leaseOldestCRLShard(ctx, req)
|
||||
}
|
||||
|
||||
// leaseOldestCRLShard finds the oldest unleased crl shard for the given issuer
|
||||
// and then leases it. Shards within the requested range which have never been
|
||||
// leased or are previously-unknown indices are considered older than any other
|
||||
// shard. It returns an error if all shards for the issuer are already leased.
|
||||
func (ssa *SQLStorageAuthority) leaseOldestCRLShard(ctx context.Context, req *sapb.LeaseCRLShardRequest) (*sapb.LeaseCRLShardResponse, error) {
|
||||
shardIdx, err := db.WithTransaction(ctx, ssa.dbMap, func(txWithCtx db.Executor) (interface{}, error) {
|
||||
var shards []*crlShardModel
|
||||
_, err := txWithCtx.Select(
|
||||
&shards,
|
||||
`SELECT id, issuerID, idx, thisUpdate, nextUpdate, leasedUntil
|
||||
FROM crlShards
|
||||
WHERE issuerID = ?`,
|
||||
req.IssuerNameID,
|
||||
)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// Convert the slice to a map to detect shards that we expect to exist, but
|
||||
// don't. At the same time, determine the oldest known shard.
|
||||
shardMap := make(map[int]*crlShardModel, len(shards))
|
||||
var oldest *crlShardModel
|
||||
for _, shard := range shards {
|
||||
if shard.Idx < int(req.MinShardIdx) || shard.Idx > int(req.MaxShardIdx) {
|
||||
continue
|
||||
}
|
||||
shardMap[shard.Idx] = shard
|
||||
if shard.LeasedUntil.After(ssa.clk.Now()) {
|
||||
continue
|
||||
}
|
||||
if oldest == nil ||
|
||||
(shard.ThisUpdate == nil && oldest.ThisUpdate != nil) ||
|
||||
shard.ThisUpdate.Before(*oldest.ThisUpdate) {
|
||||
oldest = shard
|
||||
}
|
||||
}
|
||||
|
||||
if oldest == nil {
|
||||
return -1, fmt.Errorf("issuer %d has no unleased shards in range %d - %d", req.IssuerNameID, req.MinShardIdx, req.MaxShardIdx)
|
||||
}
|
||||
|
||||
// Determine which shard index we want to lease: by default the oldest, but
|
||||
// an arbitrary missing one if any are missing.
|
||||
shardIdx := oldest.Idx
|
||||
needToInsert := false
|
||||
for i := req.MaxShardIdx; i >= req.MinShardIdx; i-- {
|
||||
_, ok := shardMap[int(i)]
|
||||
if !ok {
|
||||
shardIdx = int(i)
|
||||
needToInsert = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if needToInsert {
|
||||
_, err = txWithCtx.Exec(
|
||||
`INSERT INTO crlShards (issuerID, idx, leasedUntil)
|
||||
VALUES (?, ?, ?)`,
|
||||
req.IssuerNameID,
|
||||
shardIdx,
|
||||
req.Until.AsTime(),
|
||||
)
|
||||
} else {
|
||||
_, err = txWithCtx.Exec(
|
||||
`UPDATE crlShards
|
||||
SET leasedUntil = ?
|
||||
WHERE issuerID = ?
|
||||
AND idx = ?
|
||||
LIMIT 1`,
|
||||
req.Until.AsTime(),
|
||||
req.IssuerNameID,
|
||||
shardIdx,
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return shardIdx, err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &sapb.LeaseCRLShardResponse{
|
||||
IssuerNameID: req.IssuerNameID,
|
||||
ShardIdx: int64(shardIdx.(int)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// leaseSpecificCRLShard attempts to lease the crl shard for the given issuer
|
||||
// and shard index. It returns an error if the specified shard is already
|
||||
// leased.
|
||||
func (ssa *SQLStorageAuthority) leaseSpecificCRLShard(ctx context.Context, req *sapb.LeaseCRLShardRequest) (*sapb.LeaseCRLShardResponse, error) {
|
||||
if req.MinShardIdx != req.MaxShardIdx {
|
||||
return nil, fmt.Errorf("request must identify a single shard index: %d != %d", req.MinShardIdx, req.MaxShardIdx)
|
||||
}
|
||||
|
||||
_, err := db.WithTransaction(ctx, ssa.dbMap, func(txWithCtx db.Executor) (interface{}, error) {
|
||||
res, err := txWithCtx.Exec(
|
||||
`UPDATE crlShards
|
||||
SET leasedUntil = ?
|
||||
WHERE issuerID = ?
|
||||
AND idx = ?
|
||||
AND leasedUntil < ?
|
||||
LIMIT 1`,
|
||||
req.Until.AsTime(),
|
||||
req.IssuerNameID,
|
||||
req.MinShardIdx,
|
||||
ssa.clk.Now(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return nil, fmt.Errorf("shard %d for issuer %d already leased", req.MinShardIdx, req.IssuerNameID)
|
||||
}
|
||||
if rowsAffected != 1 {
|
||||
return nil, errors.New("update affected unexpected number of rows")
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &sapb.LeaseCRLShardResponse{
|
||||
IssuerNameID: req.IssuerNameID,
|
||||
ShardIdx: req.MinShardIdx,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// UpdateCRLShard updates the thisUpdate and nextUpdate timestamps of a CRL
|
||||
// shard. It rejects the update if it would cause the thisUpdate timestamp to
|
||||
// move backwards. It does *not* reject the update if the shard is no longer
|
||||
// leased: although this would be unexpected (because the lease timestamp should
|
||||
// be the same as the crl-updater's context expiration), it's not inherently a
|
||||
// sign of an update that should be skipped. It does reject the update if the
|
||||
// identified CRL shard does not exist in the database (it should exist, as
|
||||
// rows are created if necessary when leased).
|
||||
func (ssa *SQLStorageAuthority) UpdateCRLShard(ctx context.Context, req *sapb.UpdateCRLShardRequest) (*emptypb.Empty, error) {
|
||||
if core.IsAnyNilOrZero(req.IssuerNameID, req.ThisUpdate, req.NextUpdate) {
|
||||
return nil, errIncompleteRequest
|
||||
}
|
||||
|
||||
_, err := db.WithTransaction(ctx, ssa.dbMap, func(txWithCtx db.Executor) (interface{}, error) {
|
||||
res, err := txWithCtx.Exec(
|
||||
`UPDATE crlShards
|
||||
SET thisUpdate = ?, nextUpdate = ?
|
||||
WHERE issuerID = ?
|
||||
AND idx = ?
|
||||
AND thisUpdate < ?
|
||||
LIMIT 1`,
|
||||
req.ThisUpdate.AsTime(),
|
||||
req.NextUpdate.AsTime(),
|
||||
req.IssuerNameID,
|
||||
req.ShardIdx,
|
||||
req.ThisUpdate.AsTime(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return nil, fmt.Errorf("unable to update shard %d for issuer %d", req.ShardIdx, req.IssuerNameID)
|
||||
}
|
||||
if rowsAffected != 1 {
|
||||
return nil, errors.New("update affected unexpected number of rows")
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
||||
|
|
|
|||
333
sa/sa_test.go
333
sa/sa_test.go
|
|
@ -41,6 +41,7 @@ import (
|
|||
"golang.org/x/crypto/ocsp"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
"gopkg.in/go-jose/go-jose.v2"
|
||||
)
|
||||
|
||||
|
|
@ -3083,7 +3084,6 @@ func TestSerialsForIncident(t *testing.T) {
|
|||
"1335": true, "1336": true, "1337": true, "1338": true,
|
||||
}
|
||||
for i := range expectedSerials {
|
||||
mrand.Seed(time.Now().Unix())
|
||||
randInt := func() int64 { return mrand.Int63() }
|
||||
_, err := testIncidentsDbMap.Exec(
|
||||
fmt.Sprintf("INSERT INTO incident_foo (%s) VALUES ('%s', %d, %d, '%s')",
|
||||
|
|
@ -3253,3 +3253,334 @@ func TestGetMaxExpiration(t *testing.T) {
|
|||
test.AssertNotError(t, err, "getting last expriy should succeed")
|
||||
test.Assert(t, lastExpiry.AsTime().Equal(eeCert.NotAfter), "times should be equal")
|
||||
}
|
||||
|
||||
func TestLeaseOldestCRLShard(t *testing.T) {
|
||||
if os.Getenv("BOULDER_CONFIG_DIR") != "test/config-next" {
|
||||
t.Skip("Test requires crlShards database table")
|
||||
}
|
||||
|
||||
sa, clk, cleanUp := initSA(t)
|
||||
defer cleanUp()
|
||||
|
||||
// Create 8 shards: 4 for each of 2 issuers. For each issuer, one shard is
|
||||
// currently leased, three are available, and one of those failed to update.
|
||||
_, err := sa.dbMap.Exec(
|
||||
`INSERT INTO crlShards (issuerID, idx, thisUpdate, nextUpdate, leasedUntil) VALUES
|
||||
(1, 0, ?, ?, ?),
|
||||
(1, 1, ?, ?, ?),
|
||||
(1, 2, ?, ?, ?),
|
||||
(1, 3, NULL, NULL, ?),
|
||||
(2, 0, ?, ?, ?),
|
||||
(2, 1, ?, ?, ?),
|
||||
(2, 2, ?, ?, ?),
|
||||
(2, 3, NULL, NULL, ?);`,
|
||||
clk.Now().Add(-7*24*time.Hour), clk.Now().Add(3*24*time.Hour), clk.Now().Add(time.Hour),
|
||||
clk.Now().Add(-6*24*time.Hour), clk.Now().Add(4*24*time.Hour), clk.Now().Add(-6*24*time.Hour),
|
||||
clk.Now().Add(-5*24*time.Hour), clk.Now().Add(5*24*time.Hour), clk.Now().Add(-5*24*time.Hour),
|
||||
clk.Now().Add(-4*24*time.Hour),
|
||||
clk.Now().Add(-7*24*time.Hour), clk.Now().Add(3*24*time.Hour), clk.Now().Add(time.Hour),
|
||||
clk.Now().Add(-6*24*time.Hour), clk.Now().Add(4*24*time.Hour), clk.Now().Add(-6*24*time.Hour),
|
||||
clk.Now().Add(-5*24*time.Hour), clk.Now().Add(5*24*time.Hour), clk.Now().Add(-5*24*time.Hour),
|
||||
clk.Now().Add(-4*24*time.Hour),
|
||||
)
|
||||
test.AssertNotError(t, err, "setting up test shards")
|
||||
|
||||
until := clk.Now().Add(time.Hour).Truncate(time.Second).UTC()
|
||||
var untilModel struct {
|
||||
LeasedUntil time.Time `db:"leasedUntil"`
|
||||
}
|
||||
|
||||
// Leasing from a fully-leased subset should fail.
|
||||
_, err = sa.leaseOldestCRLShard(
|
||||
context.Background(),
|
||||
&sapb.LeaseCRLShardRequest{
|
||||
IssuerNameID: 1,
|
||||
MinShardIdx: 0,
|
||||
MaxShardIdx: 0,
|
||||
Until: timestamppb.New(until),
|
||||
},
|
||||
)
|
||||
test.AssertError(t, err, "leasing when all shards are leased")
|
||||
|
||||
// Leasing any known shard should return the never-before-leased one (3).
|
||||
res, err := sa.leaseOldestCRLShard(
|
||||
context.Background(),
|
||||
&sapb.LeaseCRLShardRequest{
|
||||
IssuerNameID: 1,
|
||||
MinShardIdx: 0,
|
||||
MaxShardIdx: 3,
|
||||
Until: timestamppb.New(until),
|
||||
},
|
||||
)
|
||||
test.AssertNotError(t, err, "leasing available shard")
|
||||
test.AssertEquals(t, res.IssuerNameID, int64(1))
|
||||
test.AssertEquals(t, res.ShardIdx, int64(3))
|
||||
|
||||
err = sa.dbMap.SelectOne(
|
||||
&untilModel,
|
||||
`SELECT leasedUntil FROM crlShards WHERE issuerID = ? AND idx = ? LIMIT 1`,
|
||||
res.IssuerNameID,
|
||||
res.ShardIdx,
|
||||
)
|
||||
test.AssertNotError(t, err, "getting updated lease timestamp")
|
||||
test.Assert(t, untilModel.LeasedUntil.Equal(until), "checking updated lease timestamp")
|
||||
|
||||
// Leasing any known shard *again* should now return the oldest one (1).
|
||||
res, err = sa.leaseOldestCRLShard(
|
||||
context.Background(),
|
||||
&sapb.LeaseCRLShardRequest{
|
||||
IssuerNameID: 1,
|
||||
MinShardIdx: 0,
|
||||
MaxShardIdx: 3,
|
||||
Until: timestamppb.New(until),
|
||||
},
|
||||
)
|
||||
test.AssertNotError(t, err, "leasing available shard")
|
||||
test.AssertEquals(t, res.IssuerNameID, int64(1))
|
||||
test.AssertEquals(t, res.ShardIdx, int64(1))
|
||||
|
||||
err = sa.dbMap.SelectOne(
|
||||
&untilModel,
|
||||
`SELECT leasedUntil FROM crlShards WHERE issuerID = ? AND idx = ? LIMIT 1`,
|
||||
res.IssuerNameID,
|
||||
res.ShardIdx,
|
||||
)
|
||||
test.AssertNotError(t, err, "getting updated lease timestamp")
|
||||
test.Assert(t, untilModel.LeasedUntil.Equal(until), "checking updated lease timestamp")
|
||||
|
||||
// Leasing from a superset of known shards should succeed and return one of
|
||||
// the previously-unknown shards.
|
||||
res, err = sa.leaseOldestCRLShard(
|
||||
context.Background(),
|
||||
&sapb.LeaseCRLShardRequest{
|
||||
IssuerNameID: 2,
|
||||
MinShardIdx: 0,
|
||||
MaxShardIdx: 7,
|
||||
Until: timestamppb.New(until),
|
||||
},
|
||||
)
|
||||
test.AssertNotError(t, err, "leasing available shard")
|
||||
test.AssertEquals(t, res.IssuerNameID, int64(2))
|
||||
test.Assert(t, res.ShardIdx >= 4, "checking leased index")
|
||||
test.Assert(t, res.ShardIdx <= 7, "checking leased index")
|
||||
|
||||
err = sa.dbMap.SelectOne(
|
||||
&untilModel,
|
||||
`SELECT leasedUntil FROM crlShards WHERE issuerID = ? AND idx = ? LIMIT 1`,
|
||||
res.IssuerNameID,
|
||||
res.ShardIdx,
|
||||
)
|
||||
test.AssertNotError(t, err, "getting updated lease timestamp")
|
||||
test.Assert(t, untilModel.LeasedUntil.Equal(until), "checking updated lease timestamp")
|
||||
}
|
||||
|
||||
func TestLeaseSpecificCRLShard(t *testing.T) {
|
||||
if os.Getenv("BOULDER_CONFIG_DIR") != "test/config-next" {
|
||||
t.Skip("Test requires crlShards database table")
|
||||
}
|
||||
|
||||
sa, clk, cleanUp := initSA(t)
|
||||
defer cleanUp()
|
||||
|
||||
// Create 8 shards: 4 for each of 2 issuers. For each issuer, one shard is
|
||||
// currently leased, three are available, and one of those failed to update.
|
||||
_, err := sa.dbMap.Exec(
|
||||
`INSERT INTO crlShards (issuerID, idx, thisUpdate, nextUpdate, leasedUntil) VALUES
|
||||
(1, 0, ?, ?, ?),
|
||||
(1, 1, ?, ?, ?),
|
||||
(1, 2, ?, ?, ?),
|
||||
(1, 3, NULL, NULL, ?),
|
||||
(2, 0, ?, ?, ?),
|
||||
(2, 1, ?, ?, ?),
|
||||
(2, 2, ?, ?, ?),
|
||||
(2, 3, NULL, NULL, ?);`,
|
||||
clk.Now().Add(-7*24*time.Hour), clk.Now().Add(3*24*time.Hour), clk.Now().Add(time.Hour),
|
||||
clk.Now().Add(-6*24*time.Hour), clk.Now().Add(4*24*time.Hour), clk.Now().Add(-6*24*time.Hour),
|
||||
clk.Now().Add(-5*24*time.Hour), clk.Now().Add(5*24*time.Hour), clk.Now().Add(-5*24*time.Hour),
|
||||
clk.Now().Add(-4*24*time.Hour),
|
||||
clk.Now().Add(-7*24*time.Hour), clk.Now().Add(3*24*time.Hour), clk.Now().Add(time.Hour),
|
||||
clk.Now().Add(-6*24*time.Hour), clk.Now().Add(4*24*time.Hour), clk.Now().Add(-6*24*time.Hour),
|
||||
clk.Now().Add(-5*24*time.Hour), clk.Now().Add(5*24*time.Hour), clk.Now().Add(-5*24*time.Hour),
|
||||
clk.Now().Add(-4*24*time.Hour),
|
||||
)
|
||||
test.AssertNotError(t, err, "setting up test shards")
|
||||
|
||||
until := clk.Now().Add(time.Hour).Truncate(time.Second).UTC()
|
||||
var untilModel struct {
|
||||
LeasedUntil time.Time `db:"leasedUntil"`
|
||||
}
|
||||
|
||||
// Leasing an unleased shard should work.
|
||||
res, err := sa.leaseSpecificCRLShard(
|
||||
context.Background(),
|
||||
&sapb.LeaseCRLShardRequest{
|
||||
IssuerNameID: 1,
|
||||
MinShardIdx: 1,
|
||||
MaxShardIdx: 1,
|
||||
Until: timestamppb.New(until),
|
||||
},
|
||||
)
|
||||
test.AssertNotError(t, err, "leasing available shard")
|
||||
test.AssertEquals(t, res.IssuerNameID, int64(1))
|
||||
test.AssertEquals(t, res.ShardIdx, int64(1))
|
||||
|
||||
err = sa.dbMap.SelectOne(
|
||||
&untilModel,
|
||||
`SELECT leasedUntil FROM crlShards WHERE issuerID = ? AND idx = ? LIMIT 1`,
|
||||
res.IssuerNameID,
|
||||
res.ShardIdx,
|
||||
)
|
||||
test.AssertNotError(t, err, "getting updated lease timestamp")
|
||||
test.Assert(t, untilModel.LeasedUntil.Equal(until), "checking updated lease timestamp")
|
||||
|
||||
// Leasing a never-before-leased shard should work.
|
||||
res, err = sa.leaseSpecificCRLShard(
|
||||
context.Background(),
|
||||
&sapb.LeaseCRLShardRequest{
|
||||
IssuerNameID: 2,
|
||||
MinShardIdx: 3,
|
||||
MaxShardIdx: 3,
|
||||
Until: timestamppb.New(until),
|
||||
},
|
||||
)
|
||||
test.AssertNotError(t, err, "leasing available shard")
|
||||
test.AssertEquals(t, res.IssuerNameID, int64(2))
|
||||
test.AssertEquals(t, res.ShardIdx, int64(3))
|
||||
|
||||
err = sa.dbMap.SelectOne(
|
||||
&untilModel,
|
||||
`SELECT leasedUntil FROM crlShards WHERE issuerID = ? AND idx = ? LIMIT 1`,
|
||||
res.IssuerNameID,
|
||||
res.ShardIdx,
|
||||
)
|
||||
test.AssertNotError(t, err, "getting updated lease timestamp")
|
||||
test.Assert(t, untilModel.LeasedUntil.Equal(until), "checking updated lease timestamp")
|
||||
|
||||
// Leasing a leased shard should fail.
|
||||
_, err = sa.leaseSpecificCRLShard(
|
||||
context.Background(),
|
||||
&sapb.LeaseCRLShardRequest{
|
||||
IssuerNameID: 1,
|
||||
MinShardIdx: 0,
|
||||
MaxShardIdx: 0,
|
||||
Until: timestamppb.New(until),
|
||||
},
|
||||
)
|
||||
test.AssertError(t, err, "leasing unavailable shard")
|
||||
|
||||
// Leasing an unknown shard should fail (because this method will only be used
|
||||
// in the short term, and should go away before we change shard counts).
|
||||
_, err = sa.leaseSpecificCRLShard(
|
||||
context.Background(),
|
||||
&sapb.LeaseCRLShardRequest{
|
||||
IssuerNameID: 1,
|
||||
MinShardIdx: 9,
|
||||
MaxShardIdx: 9,
|
||||
Until: timestamppb.New(until),
|
||||
},
|
||||
)
|
||||
test.AssertError(t, err, "leasing unknown shard")
|
||||
}
|
||||
|
||||
func TestUpdateCRLShard(t *testing.T) {
|
||||
if os.Getenv("BOULDER_CONFIG_DIR") != "test/config-next" {
|
||||
t.Skip("Test requires crlShards database table")
|
||||
}
|
||||
|
||||
sa, clk, cleanUp := initSA(t)
|
||||
defer cleanUp()
|
||||
|
||||
// Create 8 shards: 4 for each of 2 issuers. For each issuer, one shard is
|
||||
// currently leased, three are available, and one of those failed to update.
|
||||
_, err := sa.dbMap.Exec(
|
||||
`INSERT INTO crlShards (issuerID, idx, thisUpdate, nextUpdate, leasedUntil) VALUES
|
||||
(1, 0, ?, ?, ?),
|
||||
(1, 1, ?, ?, ?),
|
||||
(1, 2, ?, ?, ?),
|
||||
(1, 3, NULL, NULL, ?),
|
||||
(2, 0, ?, ?, ?),
|
||||
(2, 1, ?, ?, ?),
|
||||
(2, 2, ?, ?, ?),
|
||||
(2, 3, NULL, NULL, ?);`,
|
||||
clk.Now().Add(-7*24*time.Hour), clk.Now().Add(3*24*time.Hour), clk.Now().Add(time.Hour),
|
||||
clk.Now().Add(-6*24*time.Hour), clk.Now().Add(4*24*time.Hour), clk.Now().Add(-6*24*time.Hour),
|
||||
clk.Now().Add(-5*24*time.Hour), clk.Now().Add(5*24*time.Hour), clk.Now().Add(-5*24*time.Hour),
|
||||
clk.Now().Add(-4*24*time.Hour),
|
||||
clk.Now().Add(-7*24*time.Hour), clk.Now().Add(3*24*time.Hour), clk.Now().Add(time.Hour),
|
||||
clk.Now().Add(-6*24*time.Hour), clk.Now().Add(4*24*time.Hour), clk.Now().Add(-6*24*time.Hour),
|
||||
clk.Now().Add(-5*24*time.Hour), clk.Now().Add(5*24*time.Hour), clk.Now().Add(-5*24*time.Hour),
|
||||
clk.Now().Add(-4*24*time.Hour),
|
||||
)
|
||||
test.AssertNotError(t, err, "setting up test shards")
|
||||
|
||||
thisUpdate := clk.Now().Truncate(time.Second).UTC()
|
||||
var thisUpdateModel struct {
|
||||
ThisUpdate time.Time `db:"thisUpdate"`
|
||||
}
|
||||
|
||||
// Updating a leased shard should work.
|
||||
_, err = sa.UpdateCRLShard(
|
||||
context.Background(),
|
||||
&sapb.UpdateCRLShardRequest{
|
||||
IssuerNameID: 1,
|
||||
ShardIdx: 0,
|
||||
ThisUpdate: timestamppb.New(thisUpdate),
|
||||
NextUpdate: timestamppb.New(thisUpdate.Add(10 * 24 * time.Hour)),
|
||||
},
|
||||
)
|
||||
test.AssertNotError(t, err, "updating leased shard")
|
||||
|
||||
err = sa.dbMap.SelectOne(
|
||||
&thisUpdateModel,
|
||||
`SELECT thisUpdate FROM crlShards WHERE issuerID = ? AND idx = ? LIMIT 1`,
|
||||
1,
|
||||
0,
|
||||
)
|
||||
test.AssertNotError(t, err, "getting updated thisUpdate timestamp")
|
||||
test.Assert(t, thisUpdateModel.ThisUpdate.Equal(thisUpdate), "checking updated thisUpdate timestamp")
|
||||
|
||||
// Updating an unleased shard should work.
|
||||
_, err = sa.UpdateCRLShard(
|
||||
context.Background(),
|
||||
&sapb.UpdateCRLShardRequest{
|
||||
IssuerNameID: 1,
|
||||
ShardIdx: 1,
|
||||
ThisUpdate: timestamppb.New(thisUpdate),
|
||||
NextUpdate: timestamppb.New(thisUpdate.Add(10 * 24 * time.Hour)),
|
||||
},
|
||||
)
|
||||
test.AssertNotError(t, err, "updating unleased shard")
|
||||
|
||||
err = sa.dbMap.SelectOne(
|
||||
&thisUpdateModel,
|
||||
`SELECT thisUpdate FROM crlShards WHERE issuerID = ? AND idx = ? LIMIT 1`,
|
||||
1,
|
||||
1,
|
||||
)
|
||||
test.AssertNotError(t, err, "getting updated thisUpdate timestamp")
|
||||
test.Assert(t, thisUpdateModel.ThisUpdate.Equal(thisUpdate), "checking updated thisUpdate timestamp")
|
||||
|
||||
// Updating a shard to an earlier time should fail.
|
||||
_, err = sa.UpdateCRLShard(
|
||||
context.Background(),
|
||||
&sapb.UpdateCRLShardRequest{
|
||||
IssuerNameID: 1,
|
||||
ShardIdx: 1,
|
||||
ThisUpdate: timestamppb.New(thisUpdate.Add(-24 * time.Hour)),
|
||||
NextUpdate: timestamppb.New(thisUpdate.Add(9 * 24 * time.Hour)),
|
||||
},
|
||||
)
|
||||
test.AssertError(t, err, "updating shard to an earlier time")
|
||||
|
||||
// Updating an unknown shard should fail.
|
||||
_, err = sa.UpdateCRLShard(
|
||||
context.Background(),
|
||||
&sapb.UpdateCRLShardRequest{
|
||||
IssuerNameID: 1,
|
||||
ShardIdx: 4,
|
||||
ThisUpdate: timestamppb.New(thisUpdate),
|
||||
NextUpdate: timestamppb.New(thisUpdate.Add(10 * 24 * time.Hour)),
|
||||
},
|
||||
)
|
||||
test.AssertError(t, err, "updating shard to an earlier time")
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue