Let Activity Monitor re-bind to # on reconnect.
Fixes https://github.com/letsencrypt/boulder/issues/1212. This exposes a new constructor in amqp-rpc.go specifically for ActivityMonitor, which overrides the normal routingKey to be the wildcard "#". It also adds an expvar for the number of messages processed in ActivityMonitor, and adds an integration test case that checks that ActivityMonitor has received more than zero messages.
This commit is contained in:
parent
aa04a96e93
commit
398ba111c4
|
@ -10,6 +10,8 @@ package main
|
|||
// broker to look for anomalies.
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd"
|
||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
|
||||
|
||||
|
@ -26,11 +28,14 @@ func main() {
|
|||
go cmd.DebugServer(c.ActivityMonitor.DebugAddr)
|
||||
|
||||
amqpConf := c.ActivityMonitor.AMQP
|
||||
server, err := rpc.NewAmqpRPCServer(amqpConf, 0, stats)
|
||||
server, err := rpc.NewMonitorServer(amqpConf, 0, stats)
|
||||
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||
|
||||
ae := analysisengine.NewLoggingAnalysisEngine()
|
||||
|
||||
messages := expvar.NewInt("messages")
|
||||
server.HandleDeliveries(rpc.DeliveryHandler(func(d amqp.Delivery) {
|
||||
messages.Add(1)
|
||||
ae.ProcessMessage(d)
|
||||
}))
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ const (
|
|||
)
|
||||
|
||||
// A simplified way to declare and subscribe to an AMQP queue
|
||||
func amqpSubscribe(ch amqpChannel, name string) (<-chan amqp.Delivery, error) {
|
||||
func amqpSubscribe(ch amqpChannel, name, routingKey string) (<-chan amqp.Delivery, error) {
|
||||
var err error
|
||||
|
||||
_, err = ch.QueueDeclare(
|
||||
|
@ -73,8 +73,6 @@ func amqpSubscribe(ch amqpChannel, name string) (<-chan amqp.Delivery, error) {
|
|||
return nil, fmt.Errorf("could not declare queue: %s", err)
|
||||
}
|
||||
|
||||
routingKey := name
|
||||
|
||||
err = ch.QueueBind(
|
||||
name,
|
||||
routingKey,
|
||||
|
@ -131,6 +129,20 @@ type AmqpRPCServer struct {
|
|||
clk clock.Clock
|
||||
}
|
||||
|
||||
const wildcardRoutingKey = "#"
|
||||
|
||||
// NewMonitorServer creates an AmqpRPCServer that binds its queue to the
|
||||
// wildcard routing key instead of the default of binding to the queue name.
|
||||
// This allows Activity Monitor to observe all messages sent to the exchange.
|
||||
func NewMonitorServer(amqpConf *cmd.AMQPConfig, maxConcurrentRPCServerRequests int64, stats statsd.Statter) (*AmqpRPCServer, error) {
|
||||
server, err := NewAmqpRPCServer(amqpConf, maxConcurrentRPCServerRequests, stats)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
server.connection.routingKey = wildcardRoutingKey
|
||||
return server, nil
|
||||
}
|
||||
|
||||
// NewAmqpRPCServer creates a new RPC server for the given queue and will begin
|
||||
// consuming requests from the queue. To start the server you must call Start().
|
||||
func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, maxConcurrentRPCServerRequests int64, stats statsd.Statter) (*AmqpRPCServer, error) {
|
||||
|
|
|
@ -20,6 +20,7 @@ func newAMQPConnector(
|
|||
) *amqpConnector {
|
||||
return &amqpConnector{
|
||||
queueName: queueName,
|
||||
routingKey: queueName,
|
||||
chMaker: defaultChannelMaker{},
|
||||
clk: clock.Default(),
|
||||
retryTimeoutBase: retryTimeoutBase,
|
||||
|
@ -42,10 +43,13 @@ func (d defaultChannelMaker) makeChannel(conf *cmd.AMQPConfig) (amqpChannel, err
|
|||
// queue, plus appropriate locking for its members. It provides reconnect logic,
|
||||
// and allows publishing via the channel onto an arbitrary queue.
|
||||
type amqpConnector struct {
|
||||
mu sync.RWMutex
|
||||
chMaker channelMaker
|
||||
channel amqpChannel
|
||||
queueName string
|
||||
mu sync.RWMutex
|
||||
chMaker channelMaker
|
||||
channel amqpChannel
|
||||
queueName string
|
||||
// Usually this is the same as queueName, except for Activity Monitor, which
|
||||
// sets it to "#".
|
||||
routingKey string
|
||||
closeChan chan *amqp.Error
|
||||
msgs <-chan amqp.Delivery
|
||||
retryTimeoutBase time.Duration
|
||||
|
@ -73,7 +77,7 @@ func (ac *amqpConnector) connect(config *cmd.AMQPConfig) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("channel connect failed for %s: %s", ac.queueName, err)
|
||||
}
|
||||
msgs, err := amqpSubscribe(channel, ac.queueName)
|
||||
msgs, err := amqpSubscribe(channel, ac.queueName, ac.routingKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("queue subscribe failed for %s: %s", ac.queueName, err)
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ func setup(t *testing.T) (*amqpConnector, *MockamqpChannel, func()) {
|
|||
channel: mockChannel,
|
||||
},
|
||||
queueName: "fooqueue",
|
||||
routingKey: "fooqueue",
|
||||
retryTimeoutBase: time.Second,
|
||||
clk: clock.NewFake(),
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
import argparse
|
||||
import atexit
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
|
@ -9,8 +10,8 @@ import socket
|
|||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import urllib
|
||||
import time
|
||||
import urllib
|
||||
import urllib2
|
||||
|
||||
import startservers
|
||||
|
@ -212,6 +213,11 @@ def run_client_tests():
|
|||
if subprocess.Popen(cmd, shell=True, cwd=root, executable='/bin/bash').wait() != 0:
|
||||
die(ExitStatus.PythonFailure)
|
||||
|
||||
def check_activity_monitor():
|
||||
"""Ensure that the activity monitor is running and received some messages."""
|
||||
resp = urllib2.urlopen("http://localhost:8007/debug/vars")
|
||||
debug_vars = json.loads(resp.read())
|
||||
assert debug_vars['messages'] > 0, "Activity Monitor received zero messages."
|
||||
|
||||
@atexit.register
|
||||
def cleanup():
|
||||
|
@ -220,36 +226,48 @@ def cleanup():
|
|||
if exit_status == ExitStatus.OK:
|
||||
print("\n\nSUCCESS")
|
||||
else:
|
||||
print("\n\nFAILURE %d" % exit_status)
|
||||
if exit_status:
|
||||
print("\n\nFAILURE %d" % exit_status)
|
||||
|
||||
exit_status = ExitStatus.OK
|
||||
exit_status = None
|
||||
tempdir = tempfile.mkdtemp()
|
||||
|
||||
parser = argparse.ArgumentParser(description='Run integration tests')
|
||||
parser.add_argument('--all', dest="run_all", action="store_true",
|
||||
help="run all of the clients' integration tests")
|
||||
parser.add_argument('--letsencrypt', dest='run_letsencrypt', action='store_true',
|
||||
help="run the letsencrypt's (the python client's) integration tests")
|
||||
parser.add_argument('--node', dest="run_node", action="store_true",
|
||||
help="run the node client's integration tests")
|
||||
parser.set_defaults(run_all=False, run_letsencrypt=False, run_node=False)
|
||||
args = parser.parse_args()
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Run integration tests')
|
||||
parser.add_argument('--all', dest="run_all", action="store_true",
|
||||
help="run all of the clients' integration tests")
|
||||
parser.add_argument('--letsencrypt', dest='run_letsencrypt', action='store_true',
|
||||
help="run the letsencrypt's (the python client's) integration tests")
|
||||
parser.add_argument('--node', dest="run_node", action="store_true",
|
||||
help="run the node client's integration tests")
|
||||
parser.set_defaults(run_all=False, run_letsencrypt=False, run_node=False)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not (args.run_all or args.run_letsencrypt or args.run_node):
|
||||
print >> sys.stderr, "must run at least one of the letsencrypt or node tests with --all, --letsencrypt, or --node"
|
||||
die(ExitStatus.IncorrectCommandLineArgs)
|
||||
if not (args.run_all or args.run_letsencrypt or args.run_node):
|
||||
print >> sys.stderr, "must run at least one of the letsencrypt or node tests with --all, --letsencrypt, or --node"
|
||||
die(ExitStatus.IncorrectCommandLineArgs)
|
||||
|
||||
if not startservers.start(race_detection=True):
|
||||
die(ExitStatus.Error)
|
||||
if not startservers.start(race_detection=True):
|
||||
die(ExitStatus.Error)
|
||||
|
||||
if args.run_all or args.run_node:
|
||||
run_node_test()
|
||||
if args.run_all or args.run_node:
|
||||
run_node_test()
|
||||
|
||||
# Simulate a disconnection from RabbitMQ to make sure reconnects work.
|
||||
startservers.bounce_forward()
|
||||
# Simulate a disconnection from RabbitMQ to make sure reconnects work.
|
||||
startservers.bounce_forward()
|
||||
|
||||
if args.run_all or args.run_letsencrypt:
|
||||
run_client_tests()
|
||||
if args.run_all or args.run_letsencrypt:
|
||||
run_client_tests()
|
||||
|
||||
if not startservers.check():
|
||||
die(ExitStatus.Error)
|
||||
check_activity_monitor()
|
||||
|
||||
if not startservers.check():
|
||||
die(ExitStatus.Error)
|
||||
exit_status = ExitStatus.OK
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
main()
|
||||
except Exception:
|
||||
exit_status = ExitStatus.Error
|
||||
raise
|
||||
|
|
Loading…
Reference in New Issue