Move RPCMonitor log to the RPCClient and do the collect natively

This commit is contained in:
Roland Shoemaker 2015-09-10 12:48:35 -07:00
parent af8299d607
commit 00905ac07a
12 changed files with 33 additions and 177 deletions

View File

@ -19,7 +19,6 @@ import (
"github.com/letsencrypt/boulder/analysis"
"github.com/letsencrypt/boulder/cmd"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/rpc"
)
@ -96,12 +95,8 @@ func startMonitor(rpcCh *amqp.Channel, logger *blog.AuditLogger, stats statsd.St
cmd.FailOnError(err, "Could not subscribe to queue")
}
rpcMonitor := metrics.NewRPCMonitor(stats)
// Run forever.
for d := range deliveries {
go rpcMonitor.TimeDelivery(d)
// Pass each message to the Analysis Engine
err = ae.ProcessMessage(d)
if err != nil {

View File

@ -51,7 +51,7 @@ func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog.
ch, err := rpc.AmqpChannel(c)
cmd.FailOnError(err, "Could not connect to AMQP")
raRPC, err := rpc.NewAmqpRPCClient("revoker->RA", c.AMQP.RA.Server, ch)
raRPC, err := rpc.NewAmqpRPCClient("revoker->RA", c.AMQP.RA.Server, ch, stats)
cmd.FailOnError(err, "Unable to create RPC client")
rac, err := rpc.NewRegistrationAuthorityClient(raRPC)
@ -60,7 +60,7 @@ func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog.
dbMap, err := sa.NewDbMap(c.Revoker.DBConnect)
cmd.FailOnError(err, "Couldn't setup database connection")
saRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->SA", c.AMQP.SA.Server, ch)
saRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->SA", c.AMQP.SA.Server, ch, stats)
cmd.FailOnError(err, "Unable to create RPC client")
sac, err := rpc.NewStorageAuthorityClient(saRPC)

View File

@ -51,7 +51,7 @@ func main() {
go cmd.ProfileCmd("CA", stats)
connectionHandler := func(srv *rpc.AmqpRPCServer) {
saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, srv.Channel)
saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, srv.Channel, stats)
cmd.FailOnError(err, "Unable to create RPC client")
sac, err := rpc.NewStorageAuthorityClient(saRPC)

View File

@ -53,13 +53,13 @@ func main() {
go cmd.ProfileCmd("RA", stats)
connectionHandler := func(srv *rpc.AmqpRPCServer) {
vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, srv.Channel)
vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, srv.Channel, stats)
cmd.FailOnError(err, "Unable to create RPC client")
caRPC, err := rpc.NewAmqpRPCClient("RA->CA", c.AMQP.CA.Server, srv.Channel)
caRPC, err := rpc.NewAmqpRPCClient("RA->CA", c.AMQP.CA.Server, srv.Channel, stats)
cmd.FailOnError(err, "Unable to create RPC client")
saRPC, err := rpc.NewAmqpRPCClient("RA->SA", c.AMQP.SA.Server, srv.Channel)
saRPC, err := rpc.NewAmqpRPCClient("RA->SA", c.AMQP.SA.Server, srv.Channel, stats)
cmd.FailOnError(err, "Unable to create RPC client")
vac, err := rpc.NewValidationAuthorityClient(vaRPC)

View File

@ -57,7 +57,7 @@ func main() {
vai.UserAgent = c.VA.UserAgent
connectionHandler := func(srv *rpc.AmqpRPCServer) {
raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, srv.Channel)
raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, srv.Channel, stats)
cmd.FailOnError(err, "Unable to create RPC client")
rac, err := rpc.NewRegistrationAuthorityClient(raRPC)

View File

@ -21,17 +21,17 @@ import (
"github.com/letsencrypt/boulder/wfe"
)
func setupWFE(c cmd.Config, logger *blog.AuditLogger) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient, chan *amqp.Error) {
func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient, chan *amqp.Error) {
ch, err := rpc.AmqpChannel(c)
cmd.FailOnError(err, "Could not connect to AMQP")
logger.Info(" [!] Connected to AMQP")
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
raRPC, err := rpc.NewAmqpRPCClient("WFE->RA", c.AMQP.RA.Server, ch)
raRPC, err := rpc.NewAmqpRPCClient("WFE->RA", c.AMQP.RA.Server, ch, stats)
cmd.FailOnError(err, "Unable to create RPC client")
saRPC, err := rpc.NewAmqpRPCClient("WFE->SA", c.AMQP.SA.Server, ch)
saRPC, err := rpc.NewAmqpRPCClient("WFE->SA", c.AMQP.SA.Server, ch, stats)
cmd.FailOnError(err, "Unable to create RPC client")
rac, err := rpc.NewRegistrationAuthorityClient(raRPC)
@ -75,7 +75,7 @@ func main() {
wfe, err := wfe.NewWebFrontEndImpl(stats)
cmd.FailOnError(err, "Unable to create WFE")
rac, sac, closeChan := setupWFE(c, auditlogger)
rac, sac, closeChan := setupWFE(c, auditlogger, stats)
wfe.RA = &rac
wfe.SA = &sac
wfe.SubscriberAgreementURL = c.SubscriberAgreementURL
@ -102,7 +102,7 @@ func main() {
for err := range closeChan {
auditlogger.Warning(fmt.Sprintf(" [!] AMQP Channel closed, will reconnect in 5 seconds: [%s]", err))
time.Sleep(time.Second * 5)
rac, sac, closeChan = setupWFE(c, auditlogger)
rac, sac, closeChan = setupWFE(c, auditlogger, stats)
wfe.RA = &rac
wfe.SA = &sac
}

View File

@ -247,7 +247,7 @@ func main() {
ch, err := rpc.AmqpChannel(c)
cmd.FailOnError(err, "Could not connect to AMQP")
saRPC, err := rpc.NewAmqpRPCClient("ExpirationMailer->SA", c.AMQP.SA.Server, ch)
saRPC, err := rpc.NewAmqpRPCClient("ExpirationMailer->SA", c.AMQP.SA.Server, ch, stats)
cmd.FailOnError(err, "Unable to create RPC client")
sac, err := rpc.NewStorageAuthorityClient(saRPC)

View File

@ -36,13 +36,13 @@ type OCSPUpdater struct {
dbMap *gorp.DbMap
}
func setupClients(c cmd.Config) (rpc.CertificateAuthorityClient, chan *amqp.Error) {
func setupClients(c cmd.Config, stats statsd.Statter) (rpc.CertificateAuthorityClient, chan *amqp.Error) {
ch, err := rpc.AmqpChannel(c)
cmd.FailOnError(err, "Could not connect to AMQP")
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
caRPC, err := rpc.NewAmqpRPCClient("OCSP->CA", c.AMQP.CA.Server, ch)
caRPC, err := rpc.NewAmqpRPCClient("OCSP->CA", c.AMQP.CA.Server, ch, stats)
cmd.FailOnError(err, "Unable to create RPC client")
cac, err := rpc.NewCertificateAuthorityClient(caRPC)
@ -217,7 +217,7 @@ func main() {
dbMap, err := sa.NewDbMap(c.OCSPUpdater.DBConnect)
cmd.FailOnError(err, "Could not connect to database")
cac, closeChan := setupClients(c)
cac, closeChan := setupClients(c, stats)
go func() {
// Abort if we disconnect from AMQP

View File

@ -37,7 +37,7 @@ This list is split up into metric topics with the names of the clients that subm
[gauge] Boulder.RPC.CallsWaiting
[timing] Boulder.RPC.ResponseTime.{RPC method name}
[timing] Boulder.RPC.Latency.{RPC method name}
```
* HTTP activity (`cmd/boulder-wfe` + `cmd/ocsp-responder`)

View File

@ -6,7 +6,6 @@
package metrics
import (
"encoding/json"
"fmt"
"net"
"net/http"
@ -17,9 +16,6 @@ import (
"github.com/jmhodges/clock"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
"github.com/letsencrypt/boulder/rpc"
)
// HTTPMonitor stores some server state
@ -100,103 +96,3 @@ type RPCMonitor struct {
stats statsd.Statter
clock clock.Clock
}
// NewRPCMonitor returns a new initialized RPCMonitor and starts a goroutine
// to cleanup timeouts from the delivery map
func NewRPCMonitor(stats statsd.Statter) RPCMonitor {
r := RPCMonitor{
clock: clock.Default(),
stats: stats,
deliveryTimings: make(map[string]time.Time),
dtMu: &sync.RWMutex{},
}
go func() {
c := time.Tick(time.Second * 5)
for _ = range c {
if t := r.cleanup(); t > 0 {
stats.Inc("RPC.Timeouts", t, 1.0)
}
}
}()
return r
}
func (r *RPCMonitor) size() int {
r.dtMu.RLock()
defer r.dtMu.RUnlock()
return len(r.deliveryTimings)
}
func (r *RPCMonitor) get(id string) (time.Time, bool) {
r.dtMu.RLock()
defer r.dtMu.RUnlock()
timing, present := r.deliveryTimings[id]
return timing, present
}
func (r *RPCMonitor) add(id string) {
now := r.clock.Now()
r.dtMu.Lock()
defer r.dtMu.Unlock()
r.deliveryTimings[id] = now
}
func (r *RPCMonitor) delete(id string) {
r.dtMu.Lock()
defer r.dtMu.Unlock()
delete(r.deliveryTimings, id)
}
func (r *RPCMonitor) cleanup() (removed int64) {
checkTime := r.clock.Now().Add(-time.Second * 10)
r.dtMu.RLock()
defer r.dtMu.RUnlock()
for k, v := range r.deliveryTimings {
if checkTime.After(v) {
// Give up read lock in order to let delete acquire the write lock
r.dtMu.RUnlock()
// If the delivery has been in the map for more than 10 seconds
// it has timed out, delete it so the map doesn't grow
// indefinitely.
r.delete(k)
// Re-acuqire read lock
r.dtMu.RLock()
removed++
}
}
return removed
}
// TimeDelivery takes a single RPC delivery and provides metrics to StatsD about it
func (r *RPCMonitor) TimeDelivery(d amqp.Delivery) {
// If d is a call add to deliveryTimings and increment openCalls, if it is a
// response then get time.Since original call from deliveryTiming, send timing metric, and
// decrement openCalls, in both cases send the gauge RpcCallsWaiting and increment the counter
// RpcTraffic with the byte length of the RPC body.
r.stats.Inc("RPC.Traffic", int64(len(d.Body)), 1.0)
r.stats.Gauge("RPC.CallsWaiting", int64(r.size()), 1.0)
if d.ReplyTo != "" {
r.add(fmt.Sprintf("%s:%s", d.CorrelationId, d.ReplyTo))
} else {
rpcSent, found := r.get(fmt.Sprintf("%s:%s", d.CorrelationId, d.RoutingKey))
if !found {
r.stats.Inc("RPC.Rate.Unknown", 1, 1.0)
return
}
respTime := time.Since(rpcSent)
r.delete(fmt.Sprintf("%s:%s", d.CorrelationId, d.RoutingKey))
// Check if the call failed
state := "Success"
var resp struct {
Error rpc.RPCError
}
json.Unmarshal(d.Body, &resp)
if resp.Error.Value != "" {
state = "Error"
}
r.stats.Inc(fmt.Sprintf("RPC.Rate.%s", state), 1, 1.0)
r.stats.TimingDuration(fmt.Sprintf("RPC.ResponseTime.%s.%s", d.Type, state), respTime, 1.0)
}
}

View File

@ -4,51 +4,3 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package metrics
import (
"sync"
"testing"
"time"
"github.com/jmhodges/clock"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
"github.com/letsencrypt/boulder/test"
)
func TestRPCMonitor(t *testing.T) {
stats, _ := statsd.NewNoopClient(nil)
fc := clock.NewFake()
rm := RPCMonitor{
stats: stats,
deliveryTimings: make(map[string]time.Time),
dtMu: &sync.RWMutex{},
clock: fc,
}
rm.add("test-a")
test.AssertEquals(t, rm.size(), 1)
dTime, present := rm.get("test-a")
test.Assert(t, present, "Couldn't find delivery timing")
test.Assert(t, dTime.Equal(rm.clock.Now()), "Delivery time was in the future")
rm.delete("test-a")
test.AssertEquals(t, rm.size(), 0)
// Wait for test-b to timeout and manually call cleanup
rm.add("test-b")
fc.Add(time.Second * 11)
test.AssertEquals(t, int(rm.cleanup()), 1)
test.AssertEquals(t, rm.size(), 0)
rm.TimeDelivery(amqp.Delivery{
CorrelationId: "a",
ReplyTo: "b",
})
test.AssertEquals(t, rm.size(), 1)
rm.TimeDelivery(amqp.Delivery{
CorrelationId: "a",
RoutingKey: "b",
Body: []byte("{}"),
})
test.AssertEquals(t, rm.size(), 0)
}

View File

@ -20,6 +20,7 @@ import (
"syscall"
"time"
"github.com/cactus/go-statsd-client/statsd"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/core"
@ -276,12 +277,12 @@ func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) {
log.Info("AMQPS: Loading TLS Options.")
if strings.HasPrefix(conf.AMQP.Server, "amqps") == false {
err = fmt.Errorf("AMQPS: Not using an AMQPS URL. To use AMQP instead of AMQPS, set insecure=true.")
err = fmt.Errorf("AMQPS: Not using an AMQPS URL. To use AMQP instead of AMQPS, set insecure=true")
return nil, err
}
if conf.AMQP.TLS == nil {
err = fmt.Errorf("AMQPS: No TLS configuration provided. To use AMQP instead of AMQPS, set insecure=true.")
err = fmt.Errorf("AMQPS: No TLS configuration provided. To use AMQP instead of AMQPS, set insecure=true")
return nil, err
}
@ -477,10 +478,12 @@ type AmqpRPCCLient struct {
mu sync.Mutex
pending map[string]chan []byte
stats statsd.Statter
}
// NewAmqpRPCClient constructs an RPC client using AMQP
func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, channel *amqp.Channel) (rpc *AmqpRPCCLient, err error) {
func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, channel *amqp.Channel, stats statsd.Statter) (rpc *AmqpRPCCLient, err error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
@ -495,6 +498,7 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, channel *amqp.Chann
pending: make(map[string]chan []byte),
timeout: 10 * time.Second,
log: blog.GetAuditLogger(),
stats: stats,
}
// Subscribe to the response queue and dispatch
@ -565,6 +569,11 @@ func (rpc *AmqpRPCCLient) Dispatch(method string, body []byte) chan []byte {
// DispatchSync sends a body to the destination, and blocks waiting on a response.
func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []byte, err error) {
rpc.stats.Inc(fmt.Sprintf("RPC.Rate.%s", method), 1, 1.0)
rpc.stats.Inc("RPC.Traffic", int64(len(body)), 1.0)
rpc.stats.GaugeDelta("RPC.CallsWaiting", 1, 1.0)
defer rpc.stats.GaugeDelta("RPC.CallsWaiting", -1, 1.0)
callStarted := time.Now()
select {
case jsonResponse := <-rpc.Dispatch(method, body):
var rpcResponse RPCResponse
@ -574,11 +583,15 @@ func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []b
}
err = unwrapError(rpcResponse.Error)
if err != nil {
rpc.stats.Inc(fmt.Sprintf("RPC.Latency.%s.Error", method), 1, 1.0)
return
}
rpc.stats.Inc("RPC.Rate.Success", 1, 1.0)
rpc.stats.TimingDuration(fmt.Sprintf("RPC.Latency.%s.Success", method), time.Since(callStarted), 1.0)
response = rpcResponse.ReturnVal
return
case <-time.After(rpc.timeout):
rpc.stats.Inc("RPC.Rate.Timeouts", 1, 1.0)
rpc.log.Warning(fmt.Sprintf(" [c!][%s] AMQP-RPC timeout [%s]", rpc.clientQueue, method))
err = errors.New("AMQP-RPC timeout")
return