Update total issued count asynchronously. (#2246)

Previously the lock on total issued count would exacerbate problems when the
count query was slow, which it often is.

Fixes #1809.
This commit is contained in:
Jacob Hoffman-Andrews 2016-10-20 14:17:34 -07:00 committed by Roland Bracewell Shoemaker
parent 1f4bc0ec87
commit 1958dc9065
3 changed files with 90 additions and 59 deletions

View File

@ -197,6 +197,9 @@ func main() {
rai.CA = cac
rai.SA = sac
err = rai.UpdateIssuedCountForever()
cmd.FailOnError(err, "Updating total issuance count")
ras, err := rpc.NewAmqpRPCServer(amqpConf, c.RA.MaxConcurrentRPCServerRequests, scope, logger)
cmd.FailOnError(err, "Unable to create RA RPC server")
err = rpc.NewRegistrationAuthorityServer(ras, rai, logger)

124
ra/ra.go
View File

@ -55,13 +55,14 @@ type RegistrationAuthorityImpl struct {
authorizationLifetime time.Duration
pendingAuthorizationLifetime time.Duration
rlPolicies ratelimit.Limits
tiMu *sync.RWMutex
totalIssuedCache int
lastIssuedCount *time.Time
maxContactsPerReg int
maxNames int
forceCNFromSAN bool
reuseValidAuthz bool
// tiMu protects totalIssuedCount and totalIssuedLastUpdate
tiMu *sync.RWMutex
totalIssuedCount int
totalIssuedLastUpdate time.Time
maxContactsPerReg int
maxNames int
forceCNFromSAN bool
reuseValidAuthz bool
regByIPStats metrics.Scope
pendAuthByRegIDStats metrics.Scope
@ -116,6 +117,45 @@ func (ra *RegistrationAuthorityImpl) rateLimitPoliciesLoadError(err error) {
ra.log.Err(fmt.Sprintf("error reloading rate limit policy: %s", err))
}
// Run this to continually update the totalIssuedCount field of this
// RA by calling out to the SA. It will run one update before returning, and
// return an error if that update failed.
func (ra *RegistrationAuthorityImpl) UpdateIssuedCountForever() error {
if err := ra.updateIssuedCount(); err != nil {
return err
}
go func() {
for {
_ = ra.updateIssuedCount()
time.Sleep(1 * time.Minute)
}
}()
return nil
}
func (ra *RegistrationAuthorityImpl) updateIssuedCount() error {
totalCertLimit := ra.rlPolicies.TotalCertificates()
if totalCertLimit.Enabled() {
now := ra.clk.Now()
// We don't have a Context here, so use the background context. Note that a
// timeout is still imposed by our RPC layer.
count, err := ra.SA.CountCertificatesRange(
context.Background(),
now.Add(-totalCertLimit.Window.Duration),
now,
)
if err != nil {
ra.log.AuditErr(fmt.Sprintf("updating total issued count: %s", err))
return err
}
ra.tiMu.Lock()
ra.totalIssuedCount = int(count)
ra.totalIssuedLastUpdate = ra.clk.Now()
ra.tiMu.Unlock()
}
return nil
}
const (
unparseableEmailDetail = "not a valid e-mail address"
emptyDNSResponseDetail = "empty DNS response"
@ -181,48 +221,6 @@ type certificateRequestEvent struct {
Error string `json:",omitempty"`
}
var issuanceCountCacheLife = 1 * time.Minute
// issuanceCountInvalid checks if the current issuance count is invalid either
// because it hasn't been set yet or because it has expired. This method expects
// that the caller holds either a R or W ra.tiMu lock.
func (ra *RegistrationAuthorityImpl) issuanceCountInvalid(now time.Time) bool {
return ra.lastIssuedCount == nil || ra.lastIssuedCount.Add(issuanceCountCacheLife).Before(now)
}
func (ra *RegistrationAuthorityImpl) getIssuanceCount(ctx context.Context) (int, error) {
ra.tiMu.RLock()
if ra.issuanceCountInvalid(ra.clk.Now()) {
ra.tiMu.RUnlock()
return ra.setIssuanceCount(ctx)
}
count := ra.totalIssuedCache
ra.tiMu.RUnlock()
return count, nil
}
func (ra *RegistrationAuthorityImpl) setIssuanceCount(ctx context.Context) (int, error) {
ra.tiMu.Lock()
defer ra.tiMu.Unlock()
totalCertWindow := ra.rlPolicies.TotalCertificates().Window.Duration
now := ra.clk.Now()
if ra.issuanceCountInvalid(now) {
count, err := ra.SA.CountCertificatesRange(
ctx,
now.Add(-totalCertWindow),
now,
)
if err != nil {
return 0, err
}
ra.totalIssuedCache = int(count)
ra.lastIssuedCount = &now
}
return ra.totalIssuedCache, nil
}
// noRegistrationID is used for the regID parameter to GetThreshold when no
// registration-based overrides are necessary.
const noRegistrationID = -1
@ -738,20 +736,32 @@ func (ra *RegistrationAuthorityImpl) checkCertificatesPerFQDNSetLimit(ctx contex
return nil
}
func (ra *RegistrationAuthorityImpl) checkTotalCertificatesLimit() error {
totalCertLimits := ra.rlPolicies.TotalCertificates()
ra.tiMu.RLock()
defer ra.tiMu.RUnlock()
// If last update of the total issued count was more than five minutes ago,
// or not yet updated, fail.
if ra.clk.Now().After(ra.totalIssuedLastUpdate.Add(5*time.Minute)) ||
ra.totalIssuedLastUpdate.IsZero() {
return core.InternalServerError(fmt.Sprintf("Total certificate count out of date: updated %s", ra.totalIssuedLastUpdate))
}
if ra.totalIssuedCount >= totalCertLimits.Threshold {
ra.totalCertsStats.Inc("Exceeded", 1)
ra.log.Info(fmt.Sprintf("Rate limit exceeded, TotalCertificates, totalIssued: %d, lastUpdated %s", ra.totalIssuedCount, ra.totalIssuedLastUpdate))
return core.RateLimitedError("Global certificate issuance limit reached. Try again in an hour.")
}
ra.totalCertsStats.Inc("Pass", 1)
return nil
}
func (ra *RegistrationAuthorityImpl) checkLimits(ctx context.Context, names []string, regID int64) error {
totalCertLimits := ra.rlPolicies.TotalCertificates()
if totalCertLimits.Enabled() {
totalIssued, err := ra.getIssuanceCount(ctx)
err := ra.checkTotalCertificatesLimit()
if err != nil {
return err
}
if totalIssued >= totalCertLimits.Threshold {
domains := strings.Join(names, ",")
ra.totalCertsStats.Inc("Exceeded", 1)
ra.log.Info(fmt.Sprintf("Rate limit exceeded, TotalCertificates, regID: %d, domains: %s, totalIssued: %d", regID, domains, totalIssued))
return core.RateLimitedError("Global certificate issuance limit reached. Try again in an hour.")
}
ra.totalCertsStats.Inc("Pass", 1)
}
certNameLimits := ra.rlPolicies.CertificatesPerName()

View File

@ -201,8 +201,8 @@ func initAuthorities(t *testing.T) (*DummyValidationAuthority, *sa.SQLStorageAut
test.AssertNotError(t, err, "Failed to unmarshal JWK")
fc := clock.NewFake()
// Advance past epoch
fc.Add(360 * 24 * time.Hour)
// Set to some non-zero time.
fc.Set(time.Date(2015, 3, 4, 5, 0, 0, 0, time.UTC))
dbMap, err := sa.NewDbMap(vars.DBConnSA, 0)
if err != nil {
@ -834,6 +834,10 @@ func TestNewCertificate(t *testing.T) {
CSR: ExampleCSR,
}
if err := ra.updateIssuedCount(); err != nil {
t.Fatal("Updating issuance count:", err)
}
cert, err := ra.NewCertificate(ctx, certRequest, Registration.ID)
test.AssertNotError(t, err, "Failed to issue certificate")
@ -877,6 +881,13 @@ func TestTotalCertRateLimit(t *testing.T) {
CSR: ExampleCSR,
}
_, err = ra.NewCertificate(ctx, certRequest, Registration.ID)
test.AssertError(t, err, "Expected to fail issuance when updateIssuedCount not yet called")
if err := ra.updateIssuedCount(); err != nil {
t.Fatal("Updating issuance count:", err)
}
// TODO(jsha): Since we're using a real SA rather than a mock, we call
// NewCertificate twice and insert the first result into the SA. Instead we
// should mock out the SA and have it return the cert count that we want.
@ -886,9 +897,16 @@ func TestTotalCertRateLimit(t *testing.T) {
test.AssertNotError(t, err, "Failed to store certificate")
fc.Add(time.Hour)
if err := ra.updateIssuedCount(); err != nil {
t.Fatal("Updating issuance count:", err)
}
_, err = ra.NewCertificate(ctx, certRequest, Registration.ID)
test.AssertError(t, err, "Total certificate rate limit failed")
fc.Add(time.Hour)
_, err = ra.NewCertificate(ctx, certRequest, Registration.ID)
test.AssertError(t, err, "Expected to fail issuance when updateIssuedCount too long out of date")
}
func TestAuthzRateLimiting(t *testing.T) {