Move RabbitMQ initialization into its own binary.

Previously our executables would all try to declare the boulder exchange on
startup, which may have been leading to some race conditions in Travis. Also,
the Activity Monitor would try to bind a queue to the exchange at startup.
In prod both of these tasks are taken care of administratively, so including
them in the app code was adding unnecessary complexity. It also may have been
part of an issue causing Activity Monitor to fail to start up recently.

Also, turn the Activity Monitor into an RPC service, which gets it reconnects
for free, and add it to startservers.py.
This commit is contained in:
Jacob Hoffman-Andrews 2015-11-25 22:00:45 -08:00
parent 3961f4ab25
commit 9e4b0c1e5b
9 changed files with 153 additions and 184 deletions

View File

@ -66,6 +66,9 @@ Set up a database:
> cd $GOPATH/src/github.com/letsencrypt/boulder
> ./test/create_db.sh
Set up RabbitMQ:
> go run cmd/rabbitmq-setup/main.go -server amqp://localhost
**Note**: `create_db.sh` uses the root MariaDB user with the default
password, so if you have disabled that account or changed the password
you may have to adjust the file or recreate the commands.

View File

@ -10,9 +10,6 @@ package main
// broker to look for anomalies.
import (
"fmt"
"os"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
@ -22,106 +19,24 @@ import (
"github.com/letsencrypt/boulder/rpc"
)
// Constants for AMQP
const (
QueueName = "Monitor"
AmqpExchange = "boulder"
AmqpExchangeType = "topic"
AmqpInternal = false
AmqpDurable = false
AmqpDeleteUnused = false
AmqpExclusive = false
AmqpNoWait = false
AmqpNoLocal = false
AmqpAutoAck = false
AmqpMandatory = false
AmqpImmediate = false
)
func startMonitor(rpcCh *amqp.Channel, logger *blog.AuditLogger, stats statsd.Statter) {
ae := analysisengine.NewLoggingAnalysisEngine()
// For convenience at the broker, identifiy ourselves by hostname
consumerTag, err := os.Hostname()
if err != nil {
cmd.FailOnError(err, "Could not determine hostname")
}
_, err = rpcCh.QueueDeclarePassive(
QueueName,
AmqpDurable,
AmqpDeleteUnused,
AmqpExclusive,
AmqpNoWait,
nil)
if err != nil {
logger.Info(fmt.Sprintf("Queue %s does not exist on AMQP server, attempting to create.", QueueName))
// Attempt to create the Queue if not exists
_, err = rpcCh.QueueDeclare(
QueueName,
AmqpDurable,
AmqpDeleteUnused,
AmqpExclusive,
AmqpNoWait,
nil)
if err != nil {
cmd.FailOnError(err, "Could not declare queue")
}
routingKey := "#" //wildcard
err = rpcCh.QueueBind(
QueueName,
routingKey,
AmqpExchange,
false,
nil)
if err != nil {
txt := fmt.Sprintf("Could not bind to queue [%s]. NOTE: You may need to delete %s to re-trigger the bind attempt after fixing permissions, or manually bind the queue to %s.", QueueName, QueueName, routingKey)
cmd.FailOnError(err, txt)
}
}
deliveries, err := rpcCh.Consume(
QueueName,
consumerTag,
AmqpAutoAck,
AmqpExclusive,
AmqpNoLocal,
AmqpNoWait,
nil)
if err != nil {
cmd.FailOnError(err, "Could not subscribe to queue")
}
// Run forever.
for d := range deliveries {
// Pass each message to the Analysis Engine
err = ae.ProcessMessage(d)
if err != nil {
logger.Alert(fmt.Sprintf("Could not process message: %s", err))
} else {
// Only ack the delivery we actually handled (ackMultiple=false)
const ackMultiple = false
d.Ack(ackMultiple)
}
}
}
func main() {
app := cmd.NewAppShell("activity-monitor", "RPC activity monitor")
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
go cmd.DebugServer(c.ActivityMonitor.DebugAddr)
ch, err := rpc.AmqpChannel(c.ActivityMonitor.AMQP)
amqpConf := c.ActivityMonitor.AMQP
server, err := rpc.NewAmqpRPCServer(amqpConf, 0, stats)
cmd.FailOnError(err, "Could not connect to AMQP")
ae := analysisengine.NewLoggingAnalysisEngine()
server.HandleDeliveries(rpc.DeliveryHandler(func(d amqp.Delivery) {
ae.ProcessMessage(d)
}))
go cmd.ProfileCmd("AM", stats)
startMonitor(ch, auditlogger, stats)
server.Start(amqpConf)
}
app.Run()

View File

@ -0,0 +1,78 @@
// Copyright 2014 ISRG. All rights reserved
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package main
// This command does a one-time setup of the RabbitMQ exchange and the Activity
// Monitor queue, suitable for setting up a dev environment or Travis.
import (
"flag"
"fmt"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
"github.com/letsencrypt/boulder/cmd"
)
var server = flag.String("server", "", "RabbitMQ Server URL")
func init() {
flag.Parse()
}
// Constants for AMQP
const (
monitorQueueName = "Monitor"
amqpExchange = "boulder"
amqpExchangeType = "topic"
amqpInternal = false
amqpDurable = false
amqpDeleteUnused = false
amqpExclusive = false
amqpNoWait = false
)
func main() {
server := *server
conn, err := amqp.Dial(server)
cmd.FailOnError(err, "Could not connect to AMQP")
ch, err := conn.Channel()
cmd.FailOnError(err, "Could not connect to AMQP")
err = ch.ExchangeDeclare(
amqpExchange,
amqpExchangeType,
amqpDurable,
amqpDeleteUnused,
amqpInternal,
amqpNoWait,
nil)
cmd.FailOnError(err, "Declaring exchange")
_, err = ch.QueueDeclare(
monitorQueueName,
amqpDurable,
amqpDeleteUnused,
amqpExclusive,
amqpNoWait,
nil)
if err != nil {
cmd.FailOnError(err, "Could not declare queue")
}
routingKey := "#" //wildcard
err = ch.QueueBind(
monitorQueueName,
routingKey,
amqpExchange,
false,
nil)
if err != nil {
txt := fmt.Sprintf("Could not bind to queue [%s]. NOTE: You may need to delete %s to re-trigger the bind attempt after fixing permissions, or manually bind the queue to %s.", monitorQueueName, monitorQueueName, routingKey)
cmd.FailOnError(err, txt)
}
}

View File

@ -58,59 +58,6 @@ const (
consumerName = "boulder"
)
// AMQPDeclareExchange attempts to declare the configured AMQP exchange,
// returning silently if already declared, erroring if nonexistant and
// unable to create.
func amqpDeclareExchange(conn *amqp.Connection) error {
var err error
var ch *amqp.Channel
log := blog.GetAuditLogger()
ch, err = conn.Channel()
if err != nil {
log.Crit(fmt.Sprintf("Could not connect Channel: %s", err))
return err
}
err = ch.ExchangeDeclarePassive(
AmqpExchange,
AmqpExchangeType,
AmqpDurable,
AmqpDeleteUnused,
AmqpInternal,
AmqpNoWait,
nil)
if err != nil {
log.Info(fmt.Sprintf("Exchange %s does not exist on AMQP server, creating.", AmqpExchange))
// Channel is invalid at this point, so recreate
ch.Close()
ch, err = conn.Channel()
if err != nil {
log.Crit(fmt.Sprintf("Could not connect Channel: %s", err))
return err
}
err = ch.ExchangeDeclare(
AmqpExchange,
AmqpExchangeType,
AmqpDurable,
AmqpDeleteUnused,
AmqpInternal,
AmqpNoWait,
nil)
if err != nil {
log.Crit(fmt.Sprintf("Could not declare exchange: %s", err))
ch.Close()
return err
}
log.Info(fmt.Sprintf("Created exchange %s.", AmqpExchange))
}
ch.Close()
return err
}
// A simplified way to declare and subscribe to an AMQP queue
func amqpSubscribe(ch amqpChannel, name string) (<-chan amqp.Delivery, error) {
var err error
@ -136,8 +83,8 @@ func amqpSubscribe(ch amqpChannel, name string) (<-chan amqp.Delivery, error) {
nil)
if err != nil {
err = fmt.Errorf(
"Could not bind to queue [%s]. NOTE: You may need to delete %s to re-trigger the bind attempt after fixing permissions, or manually bind the queue to %s.",
name, name, routingKey)
"Could not bind to queue %s: %s. NOTE: You may need to delete it to re-trigger the bind attempt after fixing permissions, or manually bind the queue to %s.",
err, name, routingKey)
return nil, err
}
@ -156,6 +103,10 @@ func amqpSubscribe(ch amqpChannel, name string) (<-chan amqp.Delivery, error) {
return msgs, nil
}
// DeliveryHandler is a function that will process an amqp.DeliveryHandler
type DeliveryHandler func(amqp.Delivery)
type messageHandler func([]byte) ([]byte, error)
// AmqpRPCServer listens on a specified queue within an AMQP channel.
// When messages arrive on that queue, it dispatches them based on type,
// and returns the response to the ReplyTo queue.
@ -163,10 +114,13 @@ func amqpSubscribe(ch amqpChannel, name string) (<-chan amqp.Delivery, error) {
// To implement specific functionality, using code should use the Handle
// method to add specific actions.
type AmqpRPCServer struct {
serverQueue string
connection *amqpConnector
log *blog.AuditLogger
dispatchTable map[string]func([]byte) ([]byte, error)
serverQueue string
connection *amqpConnector
log *blog.AuditLogger
handleDelivery DeliveryHandler
// Servers that just care about messages (method + body) add entries to
// dispatchTable
dispatchTable map[string]messageHandler
connected bool
done bool
mu sync.RWMutex
@ -195,7 +149,7 @@ func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, maxConcurrentRPCServerRequests i
serverQueue: amqpConf.ServiceQueue,
connection: newAMQPConnector(amqpConf.ServiceQueue, reconnectBase, reconnectMax),
log: log,
dispatchTable: make(map[string]func([]byte) ([]byte, error)),
dispatchTable: make(map[string]messageHandler),
maxConcurrentRPCServerRequests: maxConcurrentRPCServerRequests,
clk: clock.Default(),
stats: stats,
@ -203,8 +157,19 @@ func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, maxConcurrentRPCServerRequests i
}
// Handle registers a function to handle a particular method.
func (rpc *AmqpRPCServer) Handle(method string, handler func([]byte) ([]byte, error)) {
func (rpc *AmqpRPCServer) Handle(method string, handler messageHandler) {
rpc.mu.Lock()
rpc.dispatchTable[method] = handler
rpc.mu.Unlock()
}
// HandleDeliveries allows a server to receive amqp.Delivery directly (e.g.
// ActivityMonitor), it can provide one of these. Otherwise processMessage is
// used by default.
func (rpc *AmqpRPCServer) HandleDeliveries(handler DeliveryHandler) {
rpc.mu.Lock()
rpc.handleDelivery = handler
rpc.mu.Unlock()
}
// rpcError is a JSON wrapper for error as it cannot be un/marshalled
@ -325,8 +290,8 @@ func (r rpcResponse) debugString() string {
return fmt.Sprintf("%s, RPCERR: %v", ret, r.Error)
}
// AmqpChannel sets a AMQP connection up using SSL if configuration is provided
func AmqpChannel(conf *cmd.AMQPConfig) (*amqp.Channel, error) {
// makeAmqpChannel sets a AMQP connection up using SSL if configuration is provided
func makeAmqpChannel(conf *cmd.AMQPConfig) (*amqp.Channel, error) {
var conn *amqp.Connection
var err error
@ -390,11 +355,6 @@ func AmqpChannel(conf *cmd.AMQPConfig) (*amqp.Channel, error) {
return nil, err
}
err = amqpDeclareExchange(conn)
if err != nil {
return nil, err
}
return conn.Channel()
}
@ -475,7 +435,11 @@ func (rpc *AmqpRPCServer) Start(c *cmd.AMQPConfig) error {
atomic.AddInt64(&rpc.currentGoroutines, 1)
defer atomic.AddInt64(&rpc.currentGoroutines, -1)
startedProcessing := rpc.clk.Now()
rpc.processMessage(msg)
if rpc.handleDelivery != nil {
rpc.handleDelivery(msg)
} else {
rpc.processMessage(msg)
}
rpc.stats.TimingDuration(fmt.Sprintf("RPC.ServerProcessingLatency.%s", msg.Type), time.Since(startedProcessing), 1.0)
}()
} else {

View File

@ -35,7 +35,7 @@ type channelMaker interface {
type defaultChannelMaker struct{}
func (d defaultChannelMaker) makeChannel(conf *cmd.AMQPConfig) (amqpChannel, error) {
return AmqpChannel(conf)
return makeAmqpChannel(conf)
}
// amqpConnector encapsulates an AMQP channel and a subscription to a specific

View File

@ -12,5 +12,5 @@ type Client interface {
// Server describes the functions an RPC Server performs
type Server interface {
Handle(string, func([]byte) ([]byte, error))
Handle(string, messageHandler)
}

View File

@ -243,6 +243,7 @@
"activityMonitor": {
"debugAddr": "localhost:8007",
"amqp": {
"serviceQueue": "Monitor",
"server": "amqp://guest:guest@localhost:5673",
"insecure": true
}

View File

@ -63,6 +63,7 @@ def start(race_detection):
t.daemon = True
t.start()
progs = [
'activity-monitor',
'boulder-wfe',
'boulder-ra',
'boulder-sa',

View File

@ -1,16 +1,30 @@
#!/bin/bash
set -o xtrace
# Travis does shallow clones, so there is no master branch present.
# But test-no-outdated-migrations.sh needs to check diffs against master.
# Fetch just the master branch from origin.
( git fetch origin master
git branch master FETCH_HEAD ) &
# Github-PR-Status secret
if [ -n "$encrypted_53b2630f0fb4_key" ]; then
openssl aes-256-cbc \
-K $encrypted_53b2630f0fb4_key -iv $encrypted_53b2630f0fb4_iv \
-in test/github-secret.json.enc -out /tmp/github-secret.json -d
if [ "${TRAVIS}" == "true" ]; then
# Boulder consists of multiple Go packages, which
# refer to each other by their absolute GitHub path,
# e.g. github.com/letsencrypt/boulder/analysis. That means, by default, if
# someone forks the repo, Travis won't pass on their own repo. To fix that,
# we add a symlink.
mkdir -p $TRAVIS_BUILD_DIR $GOPATH/src/github.com/letsencrypt
if [ ! -d $GOPATH/src/github.com/letsencrypt/boulder ] ; then
ln -s $TRAVIS_BUILD_DIR $GOPATH/src/github.com/letsencrypt/boulder
fi
# Travis does shallow clones, so there is no master branch present.
# But test-no-outdated-migrations.sh needs to check diffs against master.
# Fetch just the master branch from origin.
( git fetch origin master
git branch master FETCH_HEAD ) &
# Github-PR-Status secret
if [ -n "$encrypted_53b2630f0fb4_key" ]; then
openssl aes-256-cbc \
-K $encrypted_53b2630f0fb4_key -iv $encrypted_53b2630f0fb4_iv \
-in test/github-secret.json.enc -out /tmp/github-secret.json -d
fi
else
alias travis_retry=""
fi
travis_retry go get \
@ -27,17 +41,10 @@ travis_retry go get \
zcat goose.gz > $GOPATH/bin/goose &&
chmod +x $GOPATH/bin/goose) &
# Set up rabbitmq exchange and activity monitor queue
go run cmd/rabbitmq-setup/main.go -server amqp://localhost &
# Wait for all the background commands to finish.
wait
# Boulder consists of multiple Go packages, which
# refer to each other by their absolute GitHub path,
# e.g. github.com/letsencrypt/boulder/analysis. That means, by default, if
# someone forks the repo, Travis won't pass on their own repo. To fix that,
# we add a symlink.
mkdir -p $TRAVIS_BUILD_DIR $GOPATH/src/github.com/letsencrypt
if [ ! -d $GOPATH/src/github.com/letsencrypt/boulder ] ; then
ln -s $TRAVIS_BUILD_DIR $GOPATH/src/github.com/letsencrypt/boulder
fi
set +o xtrace