Move rpc delivery timing stuff to new metrics lib

This commit is contained in:
Roland Shoemaker 2015-08-15 22:25:52 -07:00
parent 2677c4e314
commit 370cd07bc9
3 changed files with 79 additions and 73 deletions

View File

@ -10,11 +10,8 @@ package main
// broker to look for anomalies.
import (
"encoding/json"
"fmt"
"os"
"sync"
"time"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
@ -22,6 +19,7 @@ import (
"github.com/letsencrypt/boulder/analysis"
"github.com/letsencrypt/boulder/cmd"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/rpc"
)
@ -41,70 +39,6 @@ const (
AmqpImmediate = false
)
type timings struct {
deliveryTimings map[string]time.Time
dtMu sync.Mutex
stats statsd.Statter
}
func (t *timings) size() int {
t.dtMu.Lock()
defer t.dtMu.Unlock()
return len(t.deliveryTimings)
}
func (t *timings) get(id string) time.Time {
t.dtMu.Lock()
defer t.dtMu.Unlock()
return t.deliveryTimings[id]
}
func (t *timings) add(id string) {
t.dtMu.Lock()
defer t.dtMu.Unlock()
t.deliveryTimings[id] = time.Now()
}
func (t *timings) delete(id string) {
t.dtMu.Lock()
defer t.dtMu.Unlock()
delete(t.deliveryTimings, id)
}
func (t *timings) timeDelivery(d amqp.Delivery) {
// If d is a call add to deliveryTimings and increment openCalls, if it is a
// response then get time.Since original call from deliveryTiming, send timing metric, and
// decrement openCalls, in both cases send the gauge RpcCallsWaiting and increment the counter
// RpcTraffic with the byte length of the RPC body.
t.stats.Inc("RPC.Traffic", int64(len(d.Body)), 1.0)
t.stats.Gauge("RPC.CallsWaiting", int64(t.size()), 1.0)
if d.ReplyTo != "" {
t.add(fmt.Sprintf("%s:%s", d.CorrelationId, d.ReplyTo))
} else {
rpcSent := t.get(fmt.Sprintf("%s:%s", d.CorrelationId, d.RoutingKey))
if rpcSent != *new(time.Time) {
respTime := time.Since(rpcSent)
t.delete(fmt.Sprintf("%s:%s", d.CorrelationId, d.RoutingKey))
// Check if the call failed
state := "Success"
var resp struct {
Error rpc.RPCError
}
json.Unmarshal(d.Body, &resp)
if resp.Error.Value != "" {
state = "Error"
}
t.stats.Inc(fmt.Sprintf("RPC.Rate.%s", state), 1, 1.0)
t.stats.TimingDuration(fmt.Sprintf("RPC.ResponseTime.%s.%s", d.Type, state), respTime, 1.0)
} else {
}
}
}
func startMonitor(rpcCh *amqp.Channel, logger *blog.AuditLogger, stats statsd.Statter) {
ae := analysisengine.NewLoggingAnalysisEngine()
@ -162,14 +96,11 @@ func startMonitor(rpcCh *amqp.Channel, logger *blog.AuditLogger, stats statsd.St
cmd.FailOnError(err, "Could not subscribe to queue")
}
timings := timings{
deliveryTimings: make(map[string]time.Time),
stats: stats,
}
rpcMonitor := metrics.NewRPCMonitor(stats)
// Run forever.
for d := range deliveries {
go timings.timeDelivery(d)
go rpcMonitor.TimeDelivery(d)
// Pass each message to the Analysis Engine
err = ae.ProcessMessage(d)

View File

@ -13,10 +13,10 @@ import (
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/codegangsta/cli"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/cmd"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/rpc"
"github.com/letsencrypt/boulder/wfe"
)

View File

@ -6,13 +6,18 @@
package metrics
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
"github.com/letsencrypt/boulder/rpc"
)
// HTTPMonitor stores some server state
@ -62,3 +67,73 @@ func (h *HTTPMonitor) watchAndServe(w http.ResponseWriter, r *http.Request) {
h.stats.TimingDuration(fmt.Sprintf("%s.HTTP.ResponseTime.%s.%s", h.statsPrefix, endpoint, state), cClosed, 1.0)
}
// RPCMonitor stores rpc delivery state
type RPCMonitor struct {
deliveryTimings map[string]time.Time
dtMu *sync.Mutex
stats statsd.Statter
}
// NewRPCMonitor returns a new initialized RPCMonitor
func NewRPCMonitor(stats statsd.Statter) RPCMonitor {
return RPCMonitor{stats: stats, deliveryTimings: make(map[string]time.Time), dtMu: &sync.Mutex{}}
}
func (r *RPCMonitor) size() int {
r.dtMu.Lock()
defer r.dtMu.Unlock()
return len(r.deliveryTimings)
}
func (r *RPCMonitor) get(id string) time.Time {
r.dtMu.Lock()
defer r.dtMu.Unlock()
return r.deliveryTimings[id]
}
func (r *RPCMonitor) add(id string) {
r.dtMu.Lock()
defer r.dtMu.Unlock()
r.deliveryTimings[id] = time.Now()
}
func (r *RPCMonitor) delete(id string) {
r.dtMu.Lock()
defer r.dtMu.Unlock()
delete(r.deliveryTimings, id)
}
// TimeDelivery takes a single RPC delivery and provides metrics to StatsD about it
func (r *RPCMonitor) TimeDelivery(d amqp.Delivery) {
// If d is a call add to deliveryTimings and increment openCalls, if it is a
// response then get time.Since original call from deliveryTiming, send timing metric, and
// decrement openCalls, in both cases send the gauge RpcCallsWaiting and increment the counter
// RpcTraffic with the byte length of the RPC body.
r.stats.Inc("RPC.Traffic", int64(len(d.Body)), 1.0)
r.stats.Gauge("RPC.CallsWaiting", int64(r.size()), 1.0)
if d.ReplyTo != "" {
r.add(fmt.Sprintf("%s:%s", d.CorrelationId, d.ReplyTo))
} else {
rpcSent := r.get(fmt.Sprintf("%s:%s", d.CorrelationId, d.RoutingKey))
if rpcSent != *new(time.Time) {
respTime := time.Since(rpcSent)
r.delete(fmt.Sprintf("%s:%s", d.CorrelationId, d.RoutingKey))
// Check if the call failed
state := "Success"
var resp struct {
Error rpc.RPCError
}
json.Unmarshal(d.Body, &resp)
if resp.Error.Value != "" {
state = "Error"
}
r.stats.Inc(fmt.Sprintf("RPC.Rate.%s", state), 1, 1.0)
r.stats.TimingDuration(fmt.Sprintf("RPC.ResponseTime.%s.%s", d.Type, state), respTime, 1.0)
} else {
}
}
}