diff --git a/cmd/boulder-ca/main.go b/cmd/boulder-ca/main.go index 2f2b605cd..aaba04c10 100644 --- a/cmd/boulder-ca/main.go +++ b/cmd/boulder-ca/main.go @@ -47,12 +47,7 @@ func main() { go cmd.ProfileCmd("CA", stats) - for { - ch, err := cmd.AmqpChannel(c) - cmd.FailOnError(err, "Could not connect to AMQP") - - closeChan := ch.NotifyClose(make(chan *amqp.Error, 1)) - + connectionHandler := func(ch *amqp.Channel) *rpc.AmqpRPCServer { saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, ch) cmd.FailOnError(err, "Unable to create RPC client") @@ -66,11 +61,12 @@ func main() { err = rpc.NewCertificateAuthorityServer(cas, cai) cmd.FailOnError(err, "Unable to create CA server") - auditlogger.Info(app.VersionString()) - - err = cmd.RunUntilSignaled(auditlogger, cas, closeChan) - cmd.FailOnError(err, "Unable to run CA RPC server") + return cas } + + auditlogger.Info(app.VersionString()) + err = cmd.RunAndReconnectUntilSignaled(connectionHandler, c, auditlogger) + cmd.FailOnError(err, "Unable to run CA RPC server") } app.Run() diff --git a/cmd/boulder-ra/main.go b/cmd/boulder-ra/main.go index b0c26c328..59b32a730 100644 --- a/cmd/boulder-ra/main.go +++ b/cmd/boulder-ra/main.go @@ -45,12 +45,7 @@ func main() { go cmd.ProfileCmd("RA", stats) - for { - ch, err := cmd.AmqpChannel(c) - cmd.FailOnError(err, "Could not connect to AMQP") - - closeChan := ch.NotifyClose(make(chan *amqp.Error, 1)) - + connectionHandler := func(ch *amqp.Channel) *rpc.AmqpRPCServer { vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, ch) cmd.FailOnError(err, "Unable to create RPC client") @@ -78,12 +73,12 @@ func main() { err = rpc.NewRegistrationAuthorityServer(ras, &rai) cmd.FailOnError(err, "Unable to create RA server") - auditlogger.Info(app.VersionString()) - - err = cmd.RunUntilSignaled(auditlogger, ras, closeChan) - cmd.FailOnError(err, "Unable to run RA RPC server") + return ras } + auditlogger.Info(app.VersionString()) + err = cmd.RunAndReconnectUntilSignaled(connectionHandler, c, auditlogger) + cmd.FailOnError(err, "Unable to run RA RPC server") } app.Run() diff --git a/cmd/boulder-sa/main.go b/cmd/boulder-sa/main.go index 653365d30..98d156029 100644 --- a/cmd/boulder-sa/main.go +++ b/cmd/boulder-sa/main.go @@ -44,22 +44,18 @@ func main() { go cmd.ProfileCmd("SA", stats) - for { - ch, err := cmd.AmqpChannel(c) - cmd.FailOnError(err, "Could not connect to AMQP") - - closeChan := ch.NotifyClose(make(chan *amqp.Error, 1)) - + connectionHandler := func(ch *amqp.Channel) *rpc.AmqpRPCServer { sas := rpc.NewAmqpRPCServer(c.AMQP.SA.Server, ch) err = rpc.NewStorageAuthorityServer(sas, sai) cmd.FailOnError(err, "Could create SA RPC server") - auditlogger.Info(app.VersionString()) - - err = cmd.RunUntilSignaled(auditlogger, sas, closeChan) - cmd.FailOnError(err, "Unable to run SA RPC server") + return sas } + + auditlogger.Info(app.VersionString()) + err = cmd.RunAndReconnectUntilSignaled(connectionHandler, c, auditlogger) + cmd.FailOnError(err, "Unable to run SA RPC server") } app.Run() diff --git a/cmd/boulder-va/main.go b/cmd/boulder-va/main.go index 0758a0af2..bd9392d1d 100644 --- a/cmd/boulder-va/main.go +++ b/cmd/boulder-va/main.go @@ -43,12 +43,7 @@ func main() { vai.DNSResolver = core.NewDNSResolverImpl(dnsTimeout, []string{c.Common.DNSResolver}) vai.UserAgent = c.VA.UserAgent - for { - ch, err := cmd.AmqpChannel(c) - cmd.FailOnError(err, "Could not connect to AMQP") - - closeChan := ch.NotifyClose(make(chan *amqp.Error, 1)) - + connectionHandler := func(ch *amqp.Channel) *rpc.AmqpRPCServer { raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, ch) cmd.FailOnError(err, "Unable to create RPC client") @@ -62,10 +57,12 @@ func main() { err = rpc.NewValidationAuthorityServer(vas, &vai) cmd.FailOnError(err, "Unable to create VA server") - auditlogger.Info(app.VersionString()) - err = cmd.RunUntilSignaled(auditlogger, vas, closeChan) - cmd.FailOnError(err, "Unable to run SA RPC server") + return vas } + + auditlogger.Info(app.VersionString()) + err = cmd.RunAndReconnectUntilSignaled(connectionHandler, c, auditlogger) + cmd.FailOnError(err, "Unable to run VA RPC server") } app.Run() diff --git a/cmd/shell.go b/cmd/shell.go index 424a051c1..82f46cec0 100644 --- a/cmd/shell.go +++ b/cmd/shell.go @@ -334,37 +334,47 @@ func RunForever(server *rpc.AmqpRPCServer) { <-forever } -// RunUntilSignaled starts the server and run until we get something on closeChan -func RunUntilSignaled(logger *blog.AuditLogger, server *rpc.AmqpRPCServer, closeChan chan *amqp.Error) error { - finishedProcessing, err := server.Start() - if err != nil { - return err - } - stopWatching, err := server.HandleInterrupts() - if err != nil { - return err - } - fmt.Fprintf(os.Stderr, "Server running...\n") - - // Block until channel closes +// RunAndReconnectUntilSignaled +func RunAndReconnectUntilSignaled(connectionHandler func(ch *amqp.Channel) *rpc.AmqpRPCServer, c Config, logger *blog.AuditLogger) error { for { - finished := true - select { - case err := <-closeChan: - logger.Warning(fmt.Sprintf("AMQP Channel closed, will reconnect in 5 seconds: [%s]", err)) - stopWatching <- true - time.Sleep(time.Second * 5) - logger.Warning("Reconnecting to AMQP...") - finished = true - case <-finishedProcessing: - logger.Info(" [!] Finished processing remaining messages, exiting") - os.Exit(0) + ch, err := AmqpChannel(c) + if err != nil { + return err } - if finished { - break + + closeChan := ch.NotifyClose(make(chan *amqp.Error, 1)) + + server := connectionHandler(ch) + + finishedProcessing, err := server.Start() + if err != nil { + return err + } + stopWatching, err := server.HandleInterrupts() + if err != nil { + return err + } + fmt.Fprintf(os.Stderr, "Server running...\n") + + // Block until channel closes + for { + finished := false + select { + case err := <-closeChan: + logger.Warning(fmt.Sprintf("AMQP Channel closed, will reconnect in 5 seconds: [%s]", err)) + stopWatching <- true + time.Sleep(time.Second * 5) + logger.Warning("Reconnecting to AMQP...") + finished = true + case <-finishedProcessing: + logger.Info(" [!] Finished processing remaining messages, exiting") + os.Exit(0) + } + if finished { + break + } } } - return nil } // ProfileCmd runs forever, sending Go statistics to StatsD. diff --git a/rpc/amqp-rpc.go b/rpc/amqp-rpc.go index 770e12242..2c5e38e49 100644 --- a/rpc/amqp-rpc.go +++ b/rpc/amqp-rpc.go @@ -97,6 +97,16 @@ func AMQPDeclareExchange(conn *amqp.Connection) error { return err } +// Create a quick consumer ID we can recreate later if we need to cancel +// a consumer. +func getConsumerName(serverQueue string) (string, error) { + hostname, err := os.Hostname() + if err != nil { + return "", err + } + return fmt.Sprintf("%s.%s", serverQueue, hostname), nil +} + // A simplified way to declare and subscribe to an AMQP queue func amqpSubscribe(ch *amqp.Channel, name string, log *blog.AuditLogger) (<-chan amqp.Delivery, error) { var err error @@ -126,12 +136,14 @@ func amqpSubscribe(ch *amqp.Channel, name string, log *blog.AuditLogger) (<-chan return nil, err } - hostname, err := os.Hostname() + consumerName, err := getConsumerName(name) if err != nil { return nil, err } - consumerName := fmt.Sprintf("%s.%s", name, hostname) + // A consumer name is used so that the specific consumer can be cancelled later + // if signalled. If no name is used a UID is used which cannot be retrieved (as + // far as I can tell). msgs, err := ch.Consume( name, consumerName, @@ -247,8 +259,9 @@ type RPCResponse struct { Error RPCError `json:"error,omitempty"` } -// Start starts the AMQP-RPC server running in a separate thread. -// There is currently no Stop() method. +// Start starts the AMQP-RPC server running in a separate thread. A channel, finished, +// is returned so that the caller can detect when the for/range messages loop has +// broke in the spawned thread so we can cleanly exit. func (rpc *AmqpRPCServer) Start() (finished chan bool, err error) { msgs, err := amqpSubscribe(rpc.channel, rpc.serverQueue, rpc.log) if err != nil { @@ -292,19 +305,19 @@ func (rpc *AmqpRPCServer) Start() (finished chan bool, err error) { return } -// HandleInterrupts creates a Goroutine to sit and watch for INT and TERM signals +// HandleInterrupts creates a Goroutine to sit and watch for INT, TERM, or HUP signals // and cancel the server consumer if it sees one so servers can gracefully shutdown. func (rpc *AmqpRPCServer) HandleInterrupts() (chan bool, error) { stopWatching := make(chan bool, 1) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM) - signal.Notify(sigChan, os.Interrupt) + signal.Notify(sigChan, syscall.SIGINT) + signal.Notify(sigChan, syscall.SIGHUP) - hostname, err := os.Hostname() + consumerName, err := getConsumerName(rpc.serverQueue) if err != nil { return nil, err } - consumerName := fmt.Sprintf("%s.%s", rpc.serverQueue, hostname) go func() { finished := false