diff --git a/rpc/amqp-rpc.go b/rpc/amqp-rpc.go index 4fdccd97c..61f342ac4 100644 --- a/rpc/amqp-rpc.go +++ b/rpc/amqp-rpc.go @@ -22,6 +22,7 @@ import ( "time" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd" + "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/jmhodges/clock" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" "github.com/letsencrypt/boulder/cmd" @@ -170,6 +171,8 @@ type AmqpRPCServer struct { currentGoroutines int64 maxConcurrentRPCServerRequests int64 tooManyRequestsResponse []byte + stats statsd.Statter + clk clock.Clock } // NewAmqpRPCServer creates a new RPC server for the given queue and will begin @@ -186,12 +189,19 @@ func NewAmqpRPCServer(serverQueue string, maxConcurrentRPCServerRequests int64, reconnectMax = time.Minute } + stats, err := statsd.NewClient(c.Statsd.Server, c.Statsd.Prefix) + if err != nil { + return nil, err + } + return &AmqpRPCServer{ serverQueue: serverQueue, connection: newAMQPConnector(serverQueue, reconnectBase, reconnectMax), log: log, dispatchTable: make(map[string]func([]byte) ([]byte, error)), maxConcurrentRPCServerRequests: maxConcurrentRPCServerRequests, + clk: clock.Default(), + stats: stats, }, nil } @@ -426,6 +436,7 @@ func (rpc *AmqpRPCServer) Start(c cmd.Config) error { select { case msg, ok := <-rpc.connection.messages(): if ok { + rpc.stats.TimingDuration(fmt.Sprintf("RPC.MessageLag.%s", rpc.serverQueue), rpc.clk.Now().Sub(msg.Timestamp), 1.0) if rpc.maxConcurrentRPCServerRequests > 0 && atomic.LoadInt64(&rpc.currentGoroutines) >= rpc.maxConcurrentRPCServerRequests { rpc.replyTooManyRequests(msg) break // this breaks the select, not the for diff --git a/rpc/connection.go b/rpc/connection.go index df9f9aec0..585595add 100644 --- a/rpc/connection.go +++ b/rpc/connection.go @@ -129,6 +129,7 @@ func (ac *amqpConnector) publish(queueName, corrId, expiration, replyTo, msgType Expiration: expiration, ReplyTo: replyTo, Type: msgType, + Timestamp: ac.clk.Now(), }) } diff --git a/rpc/connection_test.go b/rpc/connection_test.go index 89c5926d1..f86b19558 100644 --- a/rpc/connection_test.go +++ b/rpc/connection_test.go @@ -31,6 +31,7 @@ func setup(t *testing.T) (*amqpConnector, *MockamqpChannel, func()) { }, queueName: "fooqueue", retryTimeoutBase: time.Second, + clk: clock.NewFake(), } return &ac, mockChannel, func() { mockCtrl.Finish() } } @@ -125,6 +126,7 @@ func TestPublish(t *testing.T) { Expiration: "3000", ReplyTo: "replyTo", Type: "testMsg", + Timestamp: ac.clk.Now(), }) ac.publish("fooqueue", "03c52e", "3000", "replyTo", "testMsg", []byte("body")) }