Add stats to Publisher (#2083)

Fixes #1576.

Adds a new package mock_metrics, with code generated by gomock, in order to test the change.
Modifies publisher.New to take a metrics.Scope and an SA, and unexport SA.
Moves core of submission loop into a separate function, singleLogSubmit, which can return an error rather than using the continue keyword. This reduces repetition of AuditErr lines, and makes it easier to put error statting in one place.
This commit is contained in:
Jacob Hoffman-Andrews 2016-08-17 16:25:33 -07:00 committed by Roland Bracewell Shoemaker
parent 8d8263ac22
commit 0543691d9e
5 changed files with 268 additions and 53 deletions

View File

@ -71,16 +71,24 @@ func main() {
bundle = append(bundle, ct.ASN1Cert(cert.Raw))
}
pubi := publisher.New(bundle, logs, c.Publisher.SubmissionTimeout.Duration, logger)
amqpConf := c.Publisher.AMQP
sa, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create SA client")
scope := metrics.NewStatsdScope(stats, "Publisher")
pubi := publisher.New(
bundle,
logs,
c.Publisher.SubmissionTimeout.Duration,
logger,
scope,
sa)
go cmd.ProfileCmd("Publisher", stats)
amqpConf := c.Publisher.AMQP
pubi.SA, err = rpc.NewStorageAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create SA client")
if c.Publisher.GRPC != nil {
s, l, err := bgrpc.NewServer(c.Publisher.GRPC, metrics.NewStatsdScope(stats, "Publisher"))
s, l, err := bgrpc.NewServer(c.Publisher.GRPC, scope)
cmd.FailOnError(err, "Failed to setup gRPC server")
gw := bgrpc.NewPublisherServerWrapper(pubi)
pubPB.RegisterPublisherServer(s, gw)

View File

@ -0,0 +1,3 @@
package mock_metrics
//go:generate mockgen -package mock_metrics -destination ./mock_scope.go github.com/letsencrypt/boulder/metrics Scope

View File

@ -0,0 +1,145 @@
// Automatically generated by MockGen. DO NOT EDIT!
// Source: github.com/letsencrypt/boulder/metrics (interfaces: Scope)
package mock_metrics
import (
gomock "github.com/golang/mock/gomock"
metrics "github.com/letsencrypt/boulder/metrics"
time "time"
)
// Mock of Scope interface
type MockScope struct {
ctrl *gomock.Controller
recorder *_MockScopeRecorder
}
// Recorder for MockScope (not exported)
type _MockScopeRecorder struct {
mock *MockScope
}
func NewMockScope(ctrl *gomock.Controller) *MockScope {
mock := &MockScope{ctrl: ctrl}
mock.recorder = &_MockScopeRecorder{mock}
return mock
}
func (_m *MockScope) EXPECT() *_MockScopeRecorder {
return _m.recorder
}
func (_m *MockScope) Dec(_param0 string, _param1 int64) error {
ret := _m.ctrl.Call(_m, "Dec", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockScopeRecorder) Dec(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Dec", arg0, arg1)
}
func (_m *MockScope) Gauge(_param0 string, _param1 int64) error {
ret := _m.ctrl.Call(_m, "Gauge", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockScopeRecorder) Gauge(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Gauge", arg0, arg1)
}
func (_m *MockScope) GaugeDelta(_param0 string, _param1 int64) error {
ret := _m.ctrl.Call(_m, "GaugeDelta", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockScopeRecorder) GaugeDelta(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "GaugeDelta", arg0, arg1)
}
func (_m *MockScope) Inc(_param0 string, _param1 int64) error {
ret := _m.ctrl.Call(_m, "Inc", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockScopeRecorder) Inc(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Inc", arg0, arg1)
}
func (_m *MockScope) NewScope(_param0 ...string) metrics.Scope {
_s := []interface{}{}
for _, _x := range _param0 {
_s = append(_s, _x)
}
ret := _m.ctrl.Call(_m, "NewScope", _s...)
ret0, _ := ret[0].(metrics.Scope)
return ret0
}
func (_mr *_MockScopeRecorder) NewScope(arg0 ...interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "NewScope", arg0...)
}
func (_m *MockScope) Raw(_param0 string, _param1 string) error {
ret := _m.ctrl.Call(_m, "Raw", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockScopeRecorder) Raw(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Raw", arg0, arg1)
}
func (_m *MockScope) Scope() string {
ret := _m.ctrl.Call(_m, "Scope")
ret0, _ := ret[0].(string)
return ret0
}
func (_mr *_MockScopeRecorder) Scope() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Scope")
}
func (_m *MockScope) Set(_param0 string, _param1 string) error {
ret := _m.ctrl.Call(_m, "Set", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockScopeRecorder) Set(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Set", arg0, arg1)
}
func (_m *MockScope) SetInt(_param0 string, _param1 int64) error {
ret := _m.ctrl.Call(_m, "SetInt", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockScopeRecorder) SetInt(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "SetInt", arg0, arg1)
}
func (_m *MockScope) Timing(_param0 string, _param1 int64) error {
ret := _m.ctrl.Call(_m, "Timing", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockScopeRecorder) Timing(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Timing", arg0, arg1)
}
func (_m *MockScope) TimingDuration(_param0 string, _param1 time.Duration) error {
ret := _m.ctrl.Call(_m, "TimingDuration", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockScopeRecorder) TimingDuration(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "TimingDuration", arg0, arg1)
}

View File

@ -5,6 +5,7 @@ import (
"encoding/base64"
"fmt"
"net/http"
"net/url"
"strings"
"time"
@ -14,21 +15,25 @@ import (
"github.com/letsencrypt/boulder/core"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/metrics"
)
// Log contains the CT client and signature verifier for a particular CT log
type Log struct {
uri string
statName string
client *ctClient.LogClient
verifier *ct.SignatureVerifier
}
// NewLog returns an initialized Log struct
func NewLog(uri, b64PK string) (*Log, error) {
if strings.HasSuffix(uri, "/") {
uri = uri[0 : len(uri)-1]
url, err := url.Parse(uri)
if err != nil {
return nil, err
}
client := ctClient.New(uri, nil)
url.Path = strings.TrimSuffix(url.Path, "/")
client := ctClient.New(url.String(), nil)
pkBytes, err := base64.StdEncoding.DecodeString(b64PK)
if err != nil {
@ -44,7 +49,16 @@ func NewLog(uri, b64PK string) (*Log, error) {
return nil, err
}
return &Log{uri, client, verifier}, nil
// Replace slashes with dots for statsd logging
sanitizedPath := strings.TrimPrefix(url.Path, "/")
sanitizedPath = strings.Replace(sanitizedPath, "/", ".", -1)
return &Log{
uri: uri,
statName: fmt.Sprintf("%s.%s", url.Host, sanitizedPath),
client: client,
verifier: verifier,
}, nil
}
type ctSubmissionRequest struct {
@ -54,17 +68,25 @@ type ctSubmissionRequest struct {
// Impl defines a Publisher
type Impl struct {
log blog.Logger
stats metrics.Scope
client *http.Client
issuerBundle []ct.ASN1Cert
ctLogs []*Log
submissionTimeout time.Duration
SA core.StorageAuthority
sa core.StorageAuthority
}
// New creates a Publisher that will submit certificates
// to any CT logs configured in CTConfig
func New(bundle []ct.ASN1Cert, logs []*Log, submissionTimeout time.Duration, logger blog.Logger) *Impl {
func New(
bundle []ct.ASN1Cert,
logs []*Log,
submissionTimeout time.Duration,
logger blog.Logger,
stats metrics.Scope,
sa core.StorageAuthority,
) *Impl {
if submissionTimeout == 0 {
submissionTimeout = time.Hour * 12
}
@ -73,6 +95,8 @@ func New(bundle []ct.ASN1Cert, logs []*Log, submissionTimeout time.Duration, log
issuerBundle: bundle,
ctLogs: logs,
log: logger,
stats: stats,
sa: sa,
}
}
@ -89,43 +113,47 @@ func (pub *Impl) SubmitToCT(ctx context.Context, der []byte) error {
defer cancel()
chain := append([]ct.ASN1Cert{der}, pub.issuerBundle...)
for _, ctLog := range pub.ctLogs {
sct, err := ctLog.client.AddChainWithContext(localCtx, chain)
stats := pub.stats.NewScope(ctLog.statName)
stats.Inc("Submits", 1)
start := time.Now()
err := pub.singleLogSubmit(localCtx, chain, core.SerialToString(cert.SerialNumber), ctLog)
stats.TimingDuration("SubmitLatency", time.Now().Sub(start))
if err != nil {
// AUDIT[ Error Conditions ] 9cc4d537-8534-4970-8665-4b382abe82f3
pub.log.AuditErr(fmt.Sprintf("Failed to submit certificate to CT log at %s: %s", ctLog.uri, err))
continue
}
err = ctLog.verifier.VerifySCTSignature(*sct, ct.LogEntry{
Leaf: ct.MerkleTreeLeaf{
LeafType: ct.TimestampedEntryLeafType,
TimestampedEntry: ct.TimestampedEntry{
X509Entry: ct.ASN1Cert(der),
EntryType: ct.X509LogEntryType,
},
},
})
if err != nil {
// AUDIT[ Error Conditions ] 9cc4d537-8534-4970-8665-4b382abe82f3
pub.log.AuditErr(fmt.Sprintf("Failed to verify SCT receipt: %s", err))
continue
}
internalSCT, err := sctToInternal(sct, core.SerialToString(cert.SerialNumber))
if err != nil {
// AUDIT[ Error Conditions ] 9cc4d537-8534-4970-8665-4b382abe82f3
pub.log.AuditErr(fmt.Sprintf("Failed to convert SCT receipt: %s", err))
continue
}
err = pub.SA.AddSCTReceipt(localCtx, internalSCT)
if err != nil {
// AUDIT[ Error Conditions ] 9cc4d537-8534-4970-8665-4b382abe82f3
pub.log.AuditErr(fmt.Sprintf("Failed to store SCT receipt in database: %s", err))
continue
stats.Inc("Errors", 1)
}
}
return nil
}
func (pub *Impl) singleLogSubmit(ctx context.Context, chain []ct.ASN1Cert, serial string, ctLog *Log) error {
sct, err := ctLog.client.AddChainWithContext(ctx, chain)
if err != nil {
return err
}
err = ctLog.verifier.VerifySCTSignature(*sct, ct.LogEntry{
Leaf: ct.MerkleTreeLeaf{
LeafType: ct.TimestampedEntryLeafType,
TimestampedEntry: ct.TimestampedEntry{
X509Entry: chain[0],
EntryType: ct.X509LogEntryType,
},
},
})
if err != nil {
return err
}
internalSCT, err := sctToInternal(sct, serial)
if err != nil {
return err
}
err = pub.sa.AddSCTReceipt(ctx, internalSCT)
if err != nil {
return err
}
return nil
}

View File

@ -17,14 +17,18 @@ import (
"net/http/httptest"
"net/url"
"strconv"
"strings"
"testing"
"time"
"github.com/golang/mock/gomock"
ct "github.com/google/certificate-transparency/go"
"github.com/jmhodges/clock"
"golang.org/x/net/context"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/metrics/mock_metrics"
"github.com/letsencrypt/boulder/mocks"
"github.com/letsencrypt/boulder/test"
)
@ -180,7 +184,7 @@ func createSignedSCT(leaf []byte, k *ecdsa.PrivateKey) string {
func logSrv(leaf []byte, k *ecdsa.PrivateKey) *httptest.Server {
sct := createSignedSCT(leaf, k)
m := http.NewServeMux()
m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
m.HandleFunc("/ct/", func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var jsonReq ctSubmissionRequest
err := decoder.Decode(&jsonReq)
@ -200,7 +204,7 @@ func logSrv(leaf []byte, k *ecdsa.PrivateKey) *httptest.Server {
func errorLogSrv() *httptest.Server {
m := http.NewServeMux()
m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
m.HandleFunc("/ct/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
})
@ -213,7 +217,7 @@ func retryableLogSrv(leaf []byte, k *ecdsa.PrivateKey, retries int, after *int)
hits := 0
sct := createSignedSCT(leaf, k)
m := http.NewServeMux()
m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
m.HandleFunc("/ct/", func(w http.ResponseWriter, r *http.Request) {
if hits >= retries {
fmt.Fprint(w, sct)
} else {
@ -233,7 +237,7 @@ func retryableLogSrv(leaf []byte, k *ecdsa.PrivateKey, retries int, after *int)
func badLogSrv() *httptest.Server {
m := http.NewServeMux()
m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
m.HandleFunc("/ct/", func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var jsonReq ctSubmissionRequest
err := decoder.Decode(&jsonReq)
@ -254,9 +258,13 @@ func badLogSrv() *httptest.Server {
func setup(t *testing.T) (*Impl, *x509.Certificate, *ecdsa.PrivateKey) {
intermediatePEM, _ := pem.Decode([]byte(testIntermediate))
pub := New(nil, nil, 0, log)
pub := New(nil,
nil,
0,
log,
metrics.NewNoopScope(),
mocks.NewStorageAuthority(clock.NewFake()))
pub.issuerBundle = append(pub.issuerBundle, ct.ASN1Cert(intermediatePEM.Bytes))
pub.SA = mocks.NewStorageAuthority(clock.NewFake())
leafPEM, _ := pem.Decode([]byte(testLeaf))
leaf, err := x509.ParseCertificate(leafPEM.Bytes)
@ -269,25 +277,34 @@ func setup(t *testing.T) (*Impl, *x509.Certificate, *ecdsa.PrivateKey) {
}
func addLog(t *testing.T, pub *Impl, port int, pubKey *ecdsa.PublicKey) {
uri := fmt.Sprintf("http://localhost:%d/", port)
uri := fmt.Sprintf("http://localhost:%d/ct", port)
der, err := x509.MarshalPKIXPublicKey(pubKey)
test.AssertNotError(t, err, "Failed to marshal key")
newLog, err := NewLog(uri, base64.StdEncoding.EncodeToString(der))
test.AssertNotError(t, err, "Couldn't create log")
test.AssertEquals(t, newLog.uri, fmt.Sprintf("http://localhost:%d", port))
test.AssertEquals(t, newLog.uri, fmt.Sprintf("http://localhost:%d/ct", port))
pub.ctLogs = append(pub.ctLogs, newLog)
}
func TestBasicSuccessful(t *testing.T) {
pub, leaf, k := setup(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
scope := mock_metrics.NewMockScope(ctrl)
pub.stats = scope
server := logSrv(leaf.Raw, k)
defer server.Close()
port, err := getPort(server)
test.AssertNotError(t, err, "Failed to get test server port")
addLog(t, pub, port, &k.PublicKey)
statName := pub.ctLogs[0].statName
log.Clear()
scope.EXPECT().NewScope(statName).Return(scope)
scope.EXPECT().Inc("Submits", int64(1)).Return(nil)
scope.EXPECT().TimingDuration("SubmitLatency", gomock.Any()).Return(nil)
err = pub.SubmitToCT(ctx, leaf.Raw)
test.AssertNotError(t, err, "Certificate submission failed")
test.AssertEquals(t, len(log.GetAllMatching("Failed to.*")), 0)
@ -295,6 +312,9 @@ func TestBasicSuccessful(t *testing.T) {
// No Intermediate
pub.issuerBundle = []ct.ASN1Cert{}
log.Clear()
scope.EXPECT().NewScope(statName).Return(scope)
scope.EXPECT().Inc("Submits", int64(1)).Return(nil)
scope.EXPECT().TimingDuration("SubmitLatency", gomock.Any()).Return(nil)
err = pub.SubmitToCT(ctx, leaf.Raw)
test.AssertNotError(t, err, "Certificate submission failed")
test.AssertEquals(t, len(log.GetAllMatching("Failed to.*")), 0)
@ -312,19 +332,30 @@ func TestGoodRetry(t *testing.T) {
log.Clear()
err = pub.SubmitToCT(ctx, leaf.Raw)
test.AssertNotError(t, err, "Certificate submission failed")
fmt.Println(strings.Join(log.GetAllMatching(".*"), "\n"))
test.AssertEquals(t, len(log.GetAllMatching("Failed to.*")), 0)
}
func TestUnexpectedError(t *testing.T) {
pub, leaf, k := setup(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
scope := mock_metrics.NewMockScope(ctrl)
pub.stats = scope
srv := errorLogSrv()
defer srv.Close()
port, err := getPort(srv)
test.AssertNotError(t, err, "Failed to get test server port")
addLog(t, pub, port, &k.PublicKey)
statName := pub.ctLogs[0].statName
log.Clear()
scope.EXPECT().NewScope(statName).Return(scope)
scope.EXPECT().Inc("Submits", int64(1)).Return(nil)
scope.EXPECT().Inc("Errors", int64(1)).Return(nil)
scope.EXPECT().TimingDuration("SubmitLatency", gomock.Any()).Return(nil)
err = pub.SubmitToCT(ctx, leaf.Raw)
test.AssertNotError(t, err, "Certificate submission failed")
test.AssertEquals(t, len(log.GetAllMatching("Failed .*http://localhost:"+strconv.Itoa(port))), 1)
@ -400,5 +431,5 @@ func TestBadServer(t *testing.T) {
log.Clear()
err = pub.SubmitToCT(ctx, leaf.Raw)
test.AssertNotError(t, err, "Certificate submission failed")
test.AssertEquals(t, len(log.GetAllMatching("Failed to verify SCT receipt")), 1)
test.AssertEquals(t, len(log.GetAllMatching("failed to verify ecdsa signature")), 1)
}