Merge pull request #1115 from letsencrypt/rpc-lag

Add metric for RPC message lag
This commit is contained in:
Roland Bracewell Shoemaker 2015-11-09 17:43:22 -08:00
commit 48bd80d14e
3 changed files with 14 additions and 0 deletions

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/jmhodges/clock"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
"github.com/letsencrypt/boulder/cmd"
@ -170,6 +171,8 @@ type AmqpRPCServer struct {
currentGoroutines int64
maxConcurrentRPCServerRequests int64
tooManyRequestsResponse []byte
stats statsd.Statter
clk clock.Clock
}
// NewAmqpRPCServer creates a new RPC server for the given queue and will begin
@ -186,12 +189,19 @@ func NewAmqpRPCServer(serverQueue string, maxConcurrentRPCServerRequests int64,
reconnectMax = time.Minute
}
stats, err := statsd.NewClient(c.Statsd.Server, c.Statsd.Prefix)
if err != nil {
return nil, err
}
return &AmqpRPCServer{
serverQueue: serverQueue,
connection: newAMQPConnector(serverQueue, reconnectBase, reconnectMax),
log: log,
dispatchTable: make(map[string]func([]byte) ([]byte, error)),
maxConcurrentRPCServerRequests: maxConcurrentRPCServerRequests,
clk: clock.Default(),
stats: stats,
}, nil
}
@ -426,6 +436,7 @@ func (rpc *AmqpRPCServer) Start(c cmd.Config) error {
select {
case msg, ok := <-rpc.connection.messages():
if ok {
rpc.stats.TimingDuration(fmt.Sprintf("RPC.MessageLag.%s", rpc.serverQueue), rpc.clk.Now().Sub(msg.Timestamp), 1.0)
if rpc.maxConcurrentRPCServerRequests > 0 && atomic.LoadInt64(&rpc.currentGoroutines) >= rpc.maxConcurrentRPCServerRequests {
rpc.replyTooManyRequests(msg)
break // this breaks the select, not the for

View File

@ -129,6 +129,7 @@ func (ac *amqpConnector) publish(queueName, corrId, expiration, replyTo, msgType
Expiration: expiration,
ReplyTo: replyTo,
Type: msgType,
Timestamp: ac.clk.Now(),
})
}

View File

@ -31,6 +31,7 @@ func setup(t *testing.T) (*amqpConnector, *MockamqpChannel, func()) {
},
queueName: "fooqueue",
retryTimeoutBase: time.Second,
clk: clock.NewFake(),
}
return &ac, mockChannel, func() { mockCtrl.Finish() }
}
@ -125,6 +126,7 @@ func TestPublish(t *testing.T) {
Expiration: "3000",
ReplyTo: "replyTo",
Type: "testMsg",
Timestamp: ac.clk.Now(),
})
ac.publish("fooqueue", "03c52e", "3000", "replyTo", "testMsg", []byte("body"))
}