diff --git a/cmd/activity-monitor/main.go b/cmd/activity-monitor/main.go index e3e052284..cf6db100d 100644 --- a/cmd/activity-monitor/main.go +++ b/cmd/activity-monitor/main.go @@ -19,7 +19,6 @@ import ( "github.com/letsencrypt/boulder/analysis" "github.com/letsencrypt/boulder/cmd" blog "github.com/letsencrypt/boulder/log" - "github.com/letsencrypt/boulder/metrics" "github.com/letsencrypt/boulder/rpc" ) @@ -96,12 +95,8 @@ func startMonitor(rpcCh *amqp.Channel, logger *blog.AuditLogger, stats statsd.St cmd.FailOnError(err, "Could not subscribe to queue") } - rpcMonitor := metrics.NewRPCMonitor(stats) - // Run forever. for d := range deliveries { - go rpcMonitor.TimeDelivery(d) - // Pass each message to the Analysis Engine err = ae.ProcessMessage(d) if err != nil { diff --git a/cmd/admin-revoker/main.go b/cmd/admin-revoker/main.go index 3347e5cb7..d69759443 100644 --- a/cmd/admin-revoker/main.go +++ b/cmd/admin-revoker/main.go @@ -51,7 +51,7 @@ func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog. ch, err := rpc.AmqpChannel(c) cmd.FailOnError(err, "Could not connect to AMQP") - raRPC, err := rpc.NewAmqpRPCClient("revoker->RA", c.AMQP.RA.Server, ch) + raRPC, err := rpc.NewAmqpRPCClient("revoker->RA", c.AMQP.RA.Server, ch, stats) cmd.FailOnError(err, "Unable to create RPC client") rac, err := rpc.NewRegistrationAuthorityClient(raRPC) @@ -60,7 +60,7 @@ func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog. dbMap, err := sa.NewDbMap(c.Revoker.DBConnect) cmd.FailOnError(err, "Couldn't setup database connection") - saRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->SA", c.AMQP.SA.Server, ch) + saRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->SA", c.AMQP.SA.Server, ch, stats) cmd.FailOnError(err, "Unable to create RPC client") sac, err := rpc.NewStorageAuthorityClient(saRPC) diff --git a/cmd/boulder-ca/main.go b/cmd/boulder-ca/main.go index d50301878..6e9a3f7a3 100644 --- a/cmd/boulder-ca/main.go +++ b/cmd/boulder-ca/main.go @@ -51,7 +51,7 @@ func main() { go cmd.ProfileCmd("CA", stats) connectionHandler := func(srv *rpc.AmqpRPCServer) { - saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, srv.Channel) + saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, srv.Channel, stats) cmd.FailOnError(err, "Unable to create RPC client") sac, err := rpc.NewStorageAuthorityClient(saRPC) diff --git a/cmd/boulder-ra/main.go b/cmd/boulder-ra/main.go index 867266b17..b09189d1c 100644 --- a/cmd/boulder-ra/main.go +++ b/cmd/boulder-ra/main.go @@ -53,13 +53,13 @@ func main() { go cmd.ProfileCmd("RA", stats) connectionHandler := func(srv *rpc.AmqpRPCServer) { - vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, srv.Channel) + vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, srv.Channel, stats) cmd.FailOnError(err, "Unable to create RPC client") - caRPC, err := rpc.NewAmqpRPCClient("RA->CA", c.AMQP.CA.Server, srv.Channel) + caRPC, err := rpc.NewAmqpRPCClient("RA->CA", c.AMQP.CA.Server, srv.Channel, stats) cmd.FailOnError(err, "Unable to create RPC client") - saRPC, err := rpc.NewAmqpRPCClient("RA->SA", c.AMQP.SA.Server, srv.Channel) + saRPC, err := rpc.NewAmqpRPCClient("RA->SA", c.AMQP.SA.Server, srv.Channel, stats) cmd.FailOnError(err, "Unable to create RPC client") vac, err := rpc.NewValidationAuthorityClient(vaRPC) diff --git a/cmd/boulder-va/main.go b/cmd/boulder-va/main.go index 7652a4e59..25a09aa72 100644 --- a/cmd/boulder-va/main.go +++ b/cmd/boulder-va/main.go @@ -57,7 +57,7 @@ func main() { vai.UserAgent = c.VA.UserAgent connectionHandler := func(srv *rpc.AmqpRPCServer) { - raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, srv.Channel) + raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, srv.Channel, stats) cmd.FailOnError(err, "Unable to create RPC client") rac, err := rpc.NewRegistrationAuthorityClient(raRPC) diff --git a/cmd/boulder-wfe/main.go b/cmd/boulder-wfe/main.go index 4be9c0138..b5b0b8455 100644 --- a/cmd/boulder-wfe/main.go +++ b/cmd/boulder-wfe/main.go @@ -21,17 +21,17 @@ import ( "github.com/letsencrypt/boulder/wfe" ) -func setupWFE(c cmd.Config, logger *blog.AuditLogger) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient, chan *amqp.Error) { +func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient, chan *amqp.Error) { ch, err := rpc.AmqpChannel(c) cmd.FailOnError(err, "Could not connect to AMQP") logger.Info(" [!] Connected to AMQP") closeChan := ch.NotifyClose(make(chan *amqp.Error, 1)) - raRPC, err := rpc.NewAmqpRPCClient("WFE->RA", c.AMQP.RA.Server, ch) + raRPC, err := rpc.NewAmqpRPCClient("WFE->RA", c.AMQP.RA.Server, ch, stats) cmd.FailOnError(err, "Unable to create RPC client") - saRPC, err := rpc.NewAmqpRPCClient("WFE->SA", c.AMQP.SA.Server, ch) + saRPC, err := rpc.NewAmqpRPCClient("WFE->SA", c.AMQP.SA.Server, ch, stats) cmd.FailOnError(err, "Unable to create RPC client") rac, err := rpc.NewRegistrationAuthorityClient(raRPC) @@ -75,7 +75,7 @@ func main() { wfe, err := wfe.NewWebFrontEndImpl(stats) cmd.FailOnError(err, "Unable to create WFE") - rac, sac, closeChan := setupWFE(c, auditlogger) + rac, sac, closeChan := setupWFE(c, auditlogger, stats) wfe.RA = &rac wfe.SA = &sac wfe.SubscriberAgreementURL = c.SubscriberAgreementURL @@ -102,7 +102,7 @@ func main() { for err := range closeChan { auditlogger.Warning(fmt.Sprintf(" [!] AMQP Channel closed, will reconnect in 5 seconds: [%s]", err)) time.Sleep(time.Second * 5) - rac, sac, closeChan = setupWFE(c, auditlogger) + rac, sac, closeChan = setupWFE(c, auditlogger, stats) wfe.RA = &rac wfe.SA = &sac } diff --git a/cmd/expiration-mailer/main.go b/cmd/expiration-mailer/main.go index 4ba5ad7fa..685e111fd 100644 --- a/cmd/expiration-mailer/main.go +++ b/cmd/expiration-mailer/main.go @@ -247,7 +247,7 @@ func main() { ch, err := rpc.AmqpChannel(c) cmd.FailOnError(err, "Could not connect to AMQP") - saRPC, err := rpc.NewAmqpRPCClient("ExpirationMailer->SA", c.AMQP.SA.Server, ch) + saRPC, err := rpc.NewAmqpRPCClient("ExpirationMailer->SA", c.AMQP.SA.Server, ch, stats) cmd.FailOnError(err, "Unable to create RPC client") sac, err := rpc.NewStorageAuthorityClient(saRPC) diff --git a/cmd/ocsp-updater/main.go b/cmd/ocsp-updater/main.go index 5d79813ac..4368c8169 100644 --- a/cmd/ocsp-updater/main.go +++ b/cmd/ocsp-updater/main.go @@ -36,13 +36,13 @@ type OCSPUpdater struct { dbMap *gorp.DbMap } -func setupClients(c cmd.Config) (rpc.CertificateAuthorityClient, chan *amqp.Error) { +func setupClients(c cmd.Config, stats statsd.Statter) (rpc.CertificateAuthorityClient, chan *amqp.Error) { ch, err := rpc.AmqpChannel(c) cmd.FailOnError(err, "Could not connect to AMQP") closeChan := ch.NotifyClose(make(chan *amqp.Error, 1)) - caRPC, err := rpc.NewAmqpRPCClient("OCSP->CA", c.AMQP.CA.Server, ch) + caRPC, err := rpc.NewAmqpRPCClient("OCSP->CA", c.AMQP.CA.Server, ch, stats) cmd.FailOnError(err, "Unable to create RPC client") cac, err := rpc.NewCertificateAuthorityClient(caRPC) @@ -217,7 +217,7 @@ func main() { dbMap, err := sa.NewDbMap(c.OCSPUpdater.DBConnect) cmd.FailOnError(err, "Could not connect to database") - cac, closeChan := setupClients(c) + cac, closeChan := setupClients(c, stats) go func() { // Abort if we disconnect from AMQP diff --git a/docs/metrics/README.md b/docs/metrics/README.md index 133303555..2596baad8 100644 --- a/docs/metrics/README.md +++ b/docs/metrics/README.md @@ -37,7 +37,7 @@ This list is split up into metric topics with the names of the clients that subm [gauge] Boulder.RPC.CallsWaiting - [timing] Boulder.RPC.ResponseTime.{RPC method name} + [timing] Boulder.RPC.Latency.{RPC method name} ``` * HTTP activity (`cmd/boulder-wfe` + `cmd/ocsp-responder`) diff --git a/metrics/metrics.go b/metrics/metrics.go index af066f901..980973ea2 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -6,7 +6,6 @@ package metrics import ( - "encoding/json" "fmt" "net" "net/http" @@ -17,9 +16,6 @@ import ( "github.com/jmhodges/clock" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd" - "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" - - "github.com/letsencrypt/boulder/rpc" ) // HTTPMonitor stores some server state @@ -100,103 +96,3 @@ type RPCMonitor struct { stats statsd.Statter clock clock.Clock } - -// NewRPCMonitor returns a new initialized RPCMonitor and starts a goroutine -// to cleanup timeouts from the delivery map -func NewRPCMonitor(stats statsd.Statter) RPCMonitor { - r := RPCMonitor{ - clock: clock.Default(), - stats: stats, - deliveryTimings: make(map[string]time.Time), - dtMu: &sync.RWMutex{}, - } - go func() { - c := time.Tick(time.Second * 5) - for _ = range c { - if t := r.cleanup(); t > 0 { - stats.Inc("RPC.Timeouts", t, 1.0) - } - } - }() - return r -} - -func (r *RPCMonitor) size() int { - r.dtMu.RLock() - defer r.dtMu.RUnlock() - return len(r.deliveryTimings) -} - -func (r *RPCMonitor) get(id string) (time.Time, bool) { - r.dtMu.RLock() - defer r.dtMu.RUnlock() - timing, present := r.deliveryTimings[id] - return timing, present -} - -func (r *RPCMonitor) add(id string) { - now := r.clock.Now() - r.dtMu.Lock() - defer r.dtMu.Unlock() - r.deliveryTimings[id] = now -} - -func (r *RPCMonitor) delete(id string) { - r.dtMu.Lock() - defer r.dtMu.Unlock() - delete(r.deliveryTimings, id) -} - -func (r *RPCMonitor) cleanup() (removed int64) { - checkTime := r.clock.Now().Add(-time.Second * 10) - r.dtMu.RLock() - defer r.dtMu.RUnlock() - for k, v := range r.deliveryTimings { - if checkTime.After(v) { - // Give up read lock in order to let delete acquire the write lock - r.dtMu.RUnlock() - // If the delivery has been in the map for more than 10 seconds - // it has timed out, delete it so the map doesn't grow - // indefinitely. - r.delete(k) - // Re-acuqire read lock - r.dtMu.RLock() - removed++ - } - } - return removed -} - -// TimeDelivery takes a single RPC delivery and provides metrics to StatsD about it -func (r *RPCMonitor) TimeDelivery(d amqp.Delivery) { - // If d is a call add to deliveryTimings and increment openCalls, if it is a - // response then get time.Since original call from deliveryTiming, send timing metric, and - // decrement openCalls, in both cases send the gauge RpcCallsWaiting and increment the counter - // RpcTraffic with the byte length of the RPC body. - r.stats.Inc("RPC.Traffic", int64(len(d.Body)), 1.0) - r.stats.Gauge("RPC.CallsWaiting", int64(r.size()), 1.0) - - if d.ReplyTo != "" { - r.add(fmt.Sprintf("%s:%s", d.CorrelationId, d.ReplyTo)) - } else { - rpcSent, found := r.get(fmt.Sprintf("%s:%s", d.CorrelationId, d.RoutingKey)) - if !found { - r.stats.Inc("RPC.Rate.Unknown", 1, 1.0) - return - } - respTime := time.Since(rpcSent) - r.delete(fmt.Sprintf("%s:%s", d.CorrelationId, d.RoutingKey)) - - // Check if the call failed - state := "Success" - var resp struct { - Error rpc.RPCError - } - json.Unmarshal(d.Body, &resp) - if resp.Error.Value != "" { - state = "Error" - } - r.stats.Inc(fmt.Sprintf("RPC.Rate.%s", state), 1, 1.0) - r.stats.TimingDuration(fmt.Sprintf("RPC.ResponseTime.%s.%s", d.Type, state), respTime, 1.0) - } -} diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index 940ade688..262f036eb 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -4,51 +4,3 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. package metrics - -import ( - "sync" - "testing" - "time" - - "github.com/jmhodges/clock" - "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd" - "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" - - "github.com/letsencrypt/boulder/test" -) - -func TestRPCMonitor(t *testing.T) { - stats, _ := statsd.NewNoopClient(nil) - fc := clock.NewFake() - rm := RPCMonitor{ - stats: stats, - deliveryTimings: make(map[string]time.Time), - dtMu: &sync.RWMutex{}, - clock: fc, - } - - rm.add("test-a") - test.AssertEquals(t, rm.size(), 1) - dTime, present := rm.get("test-a") - test.Assert(t, present, "Couldn't find delivery timing") - test.Assert(t, dTime.Equal(rm.clock.Now()), "Delivery time was in the future") - rm.delete("test-a") - test.AssertEquals(t, rm.size(), 0) - // Wait for test-b to timeout and manually call cleanup - rm.add("test-b") - fc.Add(time.Second * 11) - test.AssertEquals(t, int(rm.cleanup()), 1) - test.AssertEquals(t, rm.size(), 0) - - rm.TimeDelivery(amqp.Delivery{ - CorrelationId: "a", - ReplyTo: "b", - }) - test.AssertEquals(t, rm.size(), 1) - rm.TimeDelivery(amqp.Delivery{ - CorrelationId: "a", - RoutingKey: "b", - Body: []byte("{}"), - }) - test.AssertEquals(t, rm.size(), 0) -} diff --git a/rpc/amqp-rpc.go b/rpc/amqp-rpc.go index ba926fa98..2a1a5f44b 100644 --- a/rpc/amqp-rpc.go +++ b/rpc/amqp-rpc.go @@ -20,6 +20,7 @@ import ( "syscall" "time" + "github.com/cactus/go-statsd-client/statsd" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" "github.com/letsencrypt/boulder/cmd" "github.com/letsencrypt/boulder/core" @@ -276,12 +277,12 @@ func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) { log.Info("AMQPS: Loading TLS Options.") if strings.HasPrefix(conf.AMQP.Server, "amqps") == false { - err = fmt.Errorf("AMQPS: Not using an AMQPS URL. To use AMQP instead of AMQPS, set insecure=true.") + err = fmt.Errorf("AMQPS: Not using an AMQPS URL. To use AMQP instead of AMQPS, set insecure=true") return nil, err } if conf.AMQP.TLS == nil { - err = fmt.Errorf("AMQPS: No TLS configuration provided. To use AMQP instead of AMQPS, set insecure=true.") + err = fmt.Errorf("AMQPS: No TLS configuration provided. To use AMQP instead of AMQPS, set insecure=true") return nil, err } @@ -477,10 +478,12 @@ type AmqpRPCCLient struct { mu sync.Mutex pending map[string]chan []byte + + stats statsd.Statter } // NewAmqpRPCClient constructs an RPC client using AMQP -func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, channel *amqp.Channel) (rpc *AmqpRPCCLient, err error) { +func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, channel *amqp.Channel, stats statsd.Statter) (rpc *AmqpRPCCLient, err error) { hostname, err := os.Hostname() if err != nil { return nil, err @@ -495,6 +498,7 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, channel *amqp.Chann pending: make(map[string]chan []byte), timeout: 10 * time.Second, log: blog.GetAuditLogger(), + stats: stats, } // Subscribe to the response queue and dispatch @@ -565,6 +569,11 @@ func (rpc *AmqpRPCCLient) Dispatch(method string, body []byte) chan []byte { // DispatchSync sends a body to the destination, and blocks waiting on a response. func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []byte, err error) { + rpc.stats.Inc(fmt.Sprintf("RPC.Rate.%s", method), 1, 1.0) + rpc.stats.Inc("RPC.Traffic", int64(len(body)), 1.0) + rpc.stats.GaugeDelta("RPC.CallsWaiting", 1, 1.0) + defer rpc.stats.GaugeDelta("RPC.CallsWaiting", -1, 1.0) + callStarted := time.Now() select { case jsonResponse := <-rpc.Dispatch(method, body): var rpcResponse RPCResponse @@ -574,11 +583,15 @@ func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []b } err = unwrapError(rpcResponse.Error) if err != nil { + rpc.stats.Inc(fmt.Sprintf("RPC.Latency.%s.Error", method), 1, 1.0) return } + rpc.stats.Inc("RPC.Rate.Success", 1, 1.0) + rpc.stats.TimingDuration(fmt.Sprintf("RPC.Latency.%s.Success", method), time.Since(callStarted), 1.0) response = rpcResponse.ReturnVal return case <-time.After(rpc.timeout): + rpc.stats.Inc("RPC.Rate.Timeouts", 1, 1.0) rpc.log.Warning(fmt.Sprintf(" [c!][%s] AMQP-RPC timeout [%s]", rpc.clientQueue, method)) err = errors.New("AMQP-RPC timeout") return