diff --git a/README.md b/README.md index 3a62dea52..3983bc6a1 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,9 @@ Set up a database: > cd $GOPATH/src/github.com/letsencrypt/boulder > ./test/create_db.sh +Set up RabbitMQ: + > go run cmd/rabbitmq-setup/main.go -server amqp://localhost + **Note**: `create_db.sh` uses the root MariaDB user with the default password, so if you have disabled that account or changed the password you may have to adjust the file or recreate the commands. diff --git a/cmd/activity-monitor/main.go b/cmd/activity-monitor/main.go index 55c8a64ae..7557b6b6b 100644 --- a/cmd/activity-monitor/main.go +++ b/cmd/activity-monitor/main.go @@ -10,9 +10,6 @@ package main // broker to look for anomalies. import ( - "fmt" - "os" - "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" @@ -22,106 +19,24 @@ import ( "github.com/letsencrypt/boulder/rpc" ) -// Constants for AMQP -const ( - QueueName = "Monitor" - AmqpExchange = "boulder" - AmqpExchangeType = "topic" - AmqpInternal = false - AmqpDurable = false - AmqpDeleteUnused = false - AmqpExclusive = false - AmqpNoWait = false - AmqpNoLocal = false - AmqpAutoAck = false - AmqpMandatory = false - AmqpImmediate = false -) - -func startMonitor(rpcCh *amqp.Channel, logger *blog.AuditLogger, stats statsd.Statter) { - ae := analysisengine.NewLoggingAnalysisEngine() - - // For convenience at the broker, identifiy ourselves by hostname - consumerTag, err := os.Hostname() - if err != nil { - cmd.FailOnError(err, "Could not determine hostname") - } - - _, err = rpcCh.QueueDeclarePassive( - QueueName, - AmqpDurable, - AmqpDeleteUnused, - AmqpExclusive, - AmqpNoWait, - nil) - if err != nil { - logger.Info(fmt.Sprintf("Queue %s does not exist on AMQP server, attempting to create.", QueueName)) - - // Attempt to create the Queue if not exists - _, err = rpcCh.QueueDeclare( - QueueName, - AmqpDurable, - AmqpDeleteUnused, - AmqpExclusive, - AmqpNoWait, - nil) - if err != nil { - cmd.FailOnError(err, "Could not declare queue") - } - - routingKey := "#" //wildcard - - err = rpcCh.QueueBind( - QueueName, - routingKey, - AmqpExchange, - false, - nil) - if err != nil { - txt := fmt.Sprintf("Could not bind to queue [%s]. NOTE: You may need to delete %s to re-trigger the bind attempt after fixing permissions, or manually bind the queue to %s.", QueueName, QueueName, routingKey) - cmd.FailOnError(err, txt) - } - } - - deliveries, err := rpcCh.Consume( - QueueName, - consumerTag, - AmqpAutoAck, - AmqpExclusive, - AmqpNoLocal, - AmqpNoWait, - nil) - if err != nil { - cmd.FailOnError(err, "Could not subscribe to queue") - } - - // Run forever. - for d := range deliveries { - // Pass each message to the Analysis Engine - err = ae.ProcessMessage(d) - if err != nil { - logger.Alert(fmt.Sprintf("Could not process message: %s", err)) - } else { - // Only ack the delivery we actually handled (ackMultiple=false) - const ackMultiple = false - d.Ack(ackMultiple) - } - } -} - func main() { app := cmd.NewAppShell("activity-monitor", "RPC activity monitor") app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) { go cmd.DebugServer(c.ActivityMonitor.DebugAddr) - ch, err := rpc.AmqpChannel(c.ActivityMonitor.AMQP) - + amqpConf := c.ActivityMonitor.AMQP + server, err := rpc.NewAmqpRPCServer(amqpConf, 0, stats) cmd.FailOnError(err, "Could not connect to AMQP") + ae := analysisengine.NewLoggingAnalysisEngine() + server.HandleDeliveries(rpc.DeliveryHandler(func(d amqp.Delivery) { + ae.ProcessMessage(d) + })) + go cmd.ProfileCmd("AM", stats) - startMonitor(ch, auditlogger, stats) + server.Start(amqpConf) } app.Run() diff --git a/cmd/rabbitmq-setup/main.go b/cmd/rabbitmq-setup/main.go new file mode 100644 index 000000000..450b86c30 --- /dev/null +++ b/cmd/rabbitmq-setup/main.go @@ -0,0 +1,78 @@ +// Copyright 2014 ISRG. All rights reserved +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package main + +// This command does a one-time setup of the RabbitMQ exchange and the Activity +// Monitor queue, suitable for setting up a dev environment or Travis. + +import ( + "flag" + "fmt" + + "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" + + "github.com/letsencrypt/boulder/cmd" +) + +var server = flag.String("server", "", "RabbitMQ Server URL") + +func init() { + flag.Parse() +} + +// Constants for AMQP +const ( + monitorQueueName = "Monitor" + amqpExchange = "boulder" + amqpExchangeType = "topic" + amqpInternal = false + amqpDurable = false + amqpDeleteUnused = false + amqpExclusive = false + amqpNoWait = false +) + +func main() { + server := *server + conn, err := amqp.Dial(server) + cmd.FailOnError(err, "Could not connect to AMQP") + ch, err := conn.Channel() + cmd.FailOnError(err, "Could not connect to AMQP") + + err = ch.ExchangeDeclare( + amqpExchange, + amqpExchangeType, + amqpDurable, + amqpDeleteUnused, + amqpInternal, + amqpNoWait, + nil) + cmd.FailOnError(err, "Declaring exchange") + + _, err = ch.QueueDeclare( + monitorQueueName, + amqpDurable, + amqpDeleteUnused, + amqpExclusive, + amqpNoWait, + nil) + if err != nil { + cmd.FailOnError(err, "Could not declare queue") + } + + routingKey := "#" //wildcard + + err = ch.QueueBind( + monitorQueueName, + routingKey, + amqpExchange, + false, + nil) + if err != nil { + txt := fmt.Sprintf("Could not bind to queue [%s]. NOTE: You may need to delete %s to re-trigger the bind attempt after fixing permissions, or manually bind the queue to %s.", monitorQueueName, monitorQueueName, routingKey) + cmd.FailOnError(err, txt) + } +} diff --git a/rpc/amqp-rpc.go b/rpc/amqp-rpc.go index 55994783d..c3d188c05 100644 --- a/rpc/amqp-rpc.go +++ b/rpc/amqp-rpc.go @@ -58,59 +58,6 @@ const ( consumerName = "boulder" ) -// AMQPDeclareExchange attempts to declare the configured AMQP exchange, -// returning silently if already declared, erroring if nonexistant and -// unable to create. -func amqpDeclareExchange(conn *amqp.Connection) error { - var err error - var ch *amqp.Channel - log := blog.GetAuditLogger() - - ch, err = conn.Channel() - if err != nil { - log.Crit(fmt.Sprintf("Could not connect Channel: %s", err)) - return err - } - - err = ch.ExchangeDeclarePassive( - AmqpExchange, - AmqpExchangeType, - AmqpDurable, - AmqpDeleteUnused, - AmqpInternal, - AmqpNoWait, - nil) - if err != nil { - log.Info(fmt.Sprintf("Exchange %s does not exist on AMQP server, creating.", AmqpExchange)) - - // Channel is invalid at this point, so recreate - ch.Close() - ch, err = conn.Channel() - if err != nil { - log.Crit(fmt.Sprintf("Could not connect Channel: %s", err)) - return err - } - - err = ch.ExchangeDeclare( - AmqpExchange, - AmqpExchangeType, - AmqpDurable, - AmqpDeleteUnused, - AmqpInternal, - AmqpNoWait, - nil) - if err != nil { - log.Crit(fmt.Sprintf("Could not declare exchange: %s", err)) - ch.Close() - return err - } - log.Info(fmt.Sprintf("Created exchange %s.", AmqpExchange)) - } - - ch.Close() - return err -} - // A simplified way to declare and subscribe to an AMQP queue func amqpSubscribe(ch amqpChannel, name string) (<-chan amqp.Delivery, error) { var err error @@ -136,8 +83,8 @@ func amqpSubscribe(ch amqpChannel, name string) (<-chan amqp.Delivery, error) { nil) if err != nil { err = fmt.Errorf( - "Could not bind to queue [%s]. NOTE: You may need to delete %s to re-trigger the bind attempt after fixing permissions, or manually bind the queue to %s.", - name, name, routingKey) + "Could not bind to queue %s: %s. NOTE: You may need to delete it to re-trigger the bind attempt after fixing permissions, or manually bind the queue to %s.", + err, name, routingKey) return nil, err } @@ -156,6 +103,10 @@ func amqpSubscribe(ch amqpChannel, name string) (<-chan amqp.Delivery, error) { return msgs, nil } +// DeliveryHandler is a function that will process an amqp.DeliveryHandler +type DeliveryHandler func(amqp.Delivery) +type messageHandler func([]byte) ([]byte, error) + // AmqpRPCServer listens on a specified queue within an AMQP channel. // When messages arrive on that queue, it dispatches them based on type, // and returns the response to the ReplyTo queue. @@ -163,10 +114,13 @@ func amqpSubscribe(ch amqpChannel, name string) (<-chan amqp.Delivery, error) { // To implement specific functionality, using code should use the Handle // method to add specific actions. type AmqpRPCServer struct { - serverQueue string - connection *amqpConnector - log *blog.AuditLogger - dispatchTable map[string]func([]byte) ([]byte, error) + serverQueue string + connection *amqpConnector + log *blog.AuditLogger + handleDelivery DeliveryHandler + // Servers that just care about messages (method + body) add entries to + // dispatchTable + dispatchTable map[string]messageHandler connected bool done bool mu sync.RWMutex @@ -195,7 +149,7 @@ func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, maxConcurrentRPCServerRequests i serverQueue: amqpConf.ServiceQueue, connection: newAMQPConnector(amqpConf.ServiceQueue, reconnectBase, reconnectMax), log: log, - dispatchTable: make(map[string]func([]byte) ([]byte, error)), + dispatchTable: make(map[string]messageHandler), maxConcurrentRPCServerRequests: maxConcurrentRPCServerRequests, clk: clock.Default(), stats: stats, @@ -203,8 +157,19 @@ func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, maxConcurrentRPCServerRequests i } // Handle registers a function to handle a particular method. -func (rpc *AmqpRPCServer) Handle(method string, handler func([]byte) ([]byte, error)) { +func (rpc *AmqpRPCServer) Handle(method string, handler messageHandler) { + rpc.mu.Lock() rpc.dispatchTable[method] = handler + rpc.mu.Unlock() +} + +// HandleDeliveries allows a server to receive amqp.Delivery directly (e.g. +// ActivityMonitor), it can provide one of these. Otherwise processMessage is +// used by default. +func (rpc *AmqpRPCServer) HandleDeliveries(handler DeliveryHandler) { + rpc.mu.Lock() + rpc.handleDelivery = handler + rpc.mu.Unlock() } // rpcError is a JSON wrapper for error as it cannot be un/marshalled @@ -325,8 +290,8 @@ func (r rpcResponse) debugString() string { return fmt.Sprintf("%s, RPCERR: %v", ret, r.Error) } -// AmqpChannel sets a AMQP connection up using SSL if configuration is provided -func AmqpChannel(conf *cmd.AMQPConfig) (*amqp.Channel, error) { +// makeAmqpChannel sets a AMQP connection up using SSL if configuration is provided +func makeAmqpChannel(conf *cmd.AMQPConfig) (*amqp.Channel, error) { var conn *amqp.Connection var err error @@ -390,11 +355,6 @@ func AmqpChannel(conf *cmd.AMQPConfig) (*amqp.Channel, error) { return nil, err } - err = amqpDeclareExchange(conn) - if err != nil { - return nil, err - } - return conn.Channel() } @@ -475,7 +435,11 @@ func (rpc *AmqpRPCServer) Start(c *cmd.AMQPConfig) error { atomic.AddInt64(&rpc.currentGoroutines, 1) defer atomic.AddInt64(&rpc.currentGoroutines, -1) startedProcessing := rpc.clk.Now() - rpc.processMessage(msg) + if rpc.handleDelivery != nil { + rpc.handleDelivery(msg) + } else { + rpc.processMessage(msg) + } rpc.stats.TimingDuration(fmt.Sprintf("RPC.ServerProcessingLatency.%s", msg.Type), time.Since(startedProcessing), 1.0) }() } else { diff --git a/rpc/connection.go b/rpc/connection.go index 98f05b47a..88e87d066 100644 --- a/rpc/connection.go +++ b/rpc/connection.go @@ -35,7 +35,7 @@ type channelMaker interface { type defaultChannelMaker struct{} func (d defaultChannelMaker) makeChannel(conf *cmd.AMQPConfig) (amqpChannel, error) { - return AmqpChannel(conf) + return makeAmqpChannel(conf) } // amqpConnector encapsulates an AMQP channel and a subscription to a specific diff --git a/rpc/rpc-interfaces.go b/rpc/rpc-interfaces.go index 85c6f369c..9003c7e59 100644 --- a/rpc/rpc-interfaces.go +++ b/rpc/rpc-interfaces.go @@ -12,5 +12,5 @@ type Client interface { // Server describes the functions an RPC Server performs type Server interface { - Handle(string, func([]byte) ([]byte, error)) + Handle(string, messageHandler) } diff --git a/test/boulder-config.json b/test/boulder-config.json index 15c316b7d..0a062901b 100644 --- a/test/boulder-config.json +++ b/test/boulder-config.json @@ -243,6 +243,7 @@ "activityMonitor": { "debugAddr": "localhost:8007", "amqp": { + "serviceQueue": "Monitor", "server": "amqp://guest:guest@localhost:5673", "insecure": true } diff --git a/test/startservers.py b/test/startservers.py index 8ae3cf76e..73a7471d5 100644 --- a/test/startservers.py +++ b/test/startservers.py @@ -63,6 +63,7 @@ def start(race_detection): t.daemon = True t.start() progs = [ + 'activity-monitor', 'boulder-wfe', 'boulder-ra', 'boulder-sa', diff --git a/test/travis-before-install.sh b/test/travis-before-install.sh index 2ea4e3788..169b259d2 100755 --- a/test/travis-before-install.sh +++ b/test/travis-before-install.sh @@ -1,16 +1,30 @@ #!/bin/bash set -o xtrace -# Travis does shallow clones, so there is no master branch present. -# But test-no-outdated-migrations.sh needs to check diffs against master. -# Fetch just the master branch from origin. -( git fetch origin master -git branch master FETCH_HEAD ) & -# Github-PR-Status secret -if [ -n "$encrypted_53b2630f0fb4_key" ]; then - openssl aes-256-cbc \ - -K $encrypted_53b2630f0fb4_key -iv $encrypted_53b2630f0fb4_iv \ - -in test/github-secret.json.enc -out /tmp/github-secret.json -d +if [ "${TRAVIS}" == "true" ]; then + # Boulder consists of multiple Go packages, which + # refer to each other by their absolute GitHub path, + # e.g. github.com/letsencrypt/boulder/analysis. That means, by default, if + # someone forks the repo, Travis won't pass on their own repo. To fix that, + # we add a symlink. + mkdir -p $TRAVIS_BUILD_DIR $GOPATH/src/github.com/letsencrypt + if [ ! -d $GOPATH/src/github.com/letsencrypt/boulder ] ; then + ln -s $TRAVIS_BUILD_DIR $GOPATH/src/github.com/letsencrypt/boulder + fi + + # Travis does shallow clones, so there is no master branch present. + # But test-no-outdated-migrations.sh needs to check diffs against master. + # Fetch just the master branch from origin. + ( git fetch origin master + git branch master FETCH_HEAD ) & + # Github-PR-Status secret + if [ -n "$encrypted_53b2630f0fb4_key" ]; then + openssl aes-256-cbc \ + -K $encrypted_53b2630f0fb4_key -iv $encrypted_53b2630f0fb4_iv \ + -in test/github-secret.json.enc -out /tmp/github-secret.json -d + fi +else + alias travis_retry="" fi travis_retry go get \ @@ -27,17 +41,10 @@ travis_retry go get \ zcat goose.gz > $GOPATH/bin/goose && chmod +x $GOPATH/bin/goose) & +# Set up rabbitmq exchange and activity monitor queue +go run cmd/rabbitmq-setup/main.go -server amqp://localhost & + # Wait for all the background commands to finish. wait -# Boulder consists of multiple Go packages, which -# refer to each other by their absolute GitHub path, -# e.g. github.com/letsencrypt/boulder/analysis. That means, by default, if -# someone forks the repo, Travis won't pass on their own repo. To fix that, -# we add a symlink. -mkdir -p $TRAVIS_BUILD_DIR $GOPATH/src/github.com/letsencrypt -if [ ! -d $GOPATH/src/github.com/letsencrypt/boulder ] ; then - ln -s $TRAVIS_BUILD_DIR $GOPATH/src/github.com/letsencrypt/boulder -fi - set +o xtrace