diff --git a/cmd/activity-monitor/main.go b/cmd/activity-monitor/main.go index dec4f7fb5..95feb41d4 100644 --- a/cmd/activity-monitor/main.go +++ b/cmd/activity-monitor/main.go @@ -16,6 +16,7 @@ import ( "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" + "github.com/letsencrypt/boulder/rpc" "github.com/letsencrypt/boulder/analysis" "github.com/letsencrypt/boulder/cmd" @@ -153,7 +154,7 @@ func main() { go cmd.DebugServer(c.ActivityMonitor.DebugAddr) - ch, err := cmd.AmqpChannel(c) + ch, err := rpc.AmqpChannel(c) cmd.FailOnError(err, "Could not connect to AMQP") diff --git a/cmd/admin-revoker/main.go b/cmd/admin-revoker/main.go index 54999ff16..a60cebb9f 100644 --- a/cmd/admin-revoker/main.go +++ b/cmd/admin-revoker/main.go @@ -67,7 +67,7 @@ func setupContext(context *cli.Context) (rpc.CertificateAuthorityClient, *blog.A cmd.FailOnError(err, "Could not connect to Syslog") blog.SetAuditLogger(auditlogger) - ch, err := cmd.AmqpChannel(c) + ch, err := rpc.AmqpChannel(c) cmd.FailOnError(err, "Could not connect to AMQP") caRPC, err := rpc.NewAmqpRPCClient("revoker->CA", c.AMQP.CA.Server, ch) diff --git a/cmd/boulder-ca/main.go b/cmd/boulder-ca/main.go index 4323219e0..300bbd589 100644 --- a/cmd/boulder-ca/main.go +++ b/cmd/boulder-ca/main.go @@ -7,7 +7,6 @@ package main import ( "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd" - "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" "github.com/letsencrypt/boulder/ca" "github.com/letsencrypt/boulder/cmd" @@ -47,25 +46,23 @@ func main() { go cmd.ProfileCmd("CA", stats) - connectionHandler := func(ch *amqp.Channel) *rpc.AmqpRPCServer { - saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, ch) + connectionHandler := func(srv *rpc.AmqpRPCServer) { + saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, srv.Channel) cmd.FailOnError(err, "Unable to create RPC client") sac, err := rpc.NewStorageAuthorityClient(saRPC) cmd.FailOnError(err, "Failed to create SA client") cai.SA = &sac - - cas := rpc.NewAmqpRPCServer(c.AMQP.CA.Server, ch) - - err = rpc.NewCertificateAuthorityServer(cas, cai) - cmd.FailOnError(err, "Unable to create CA server") - - return cas } + cas, err := rpc.NewAmqpRPCServer(c.AMQP.CA.Server, connectionHandler) + cmd.FailOnError(err, "Unable to create CA RPC server") + rpc.NewCertificateAuthorityServer(cas, cai) + auditlogger.Info(app.VersionString()) - err = cmd.RunUntilSignaled(connectionHandler, c, auditlogger) + + err = cas.Start(c) cmd.FailOnError(err, "Unable to run CA RPC server") } diff --git a/cmd/boulder-ra/main.go b/cmd/boulder-ra/main.go index 2e1020a6b..ba9b45bbd 100644 --- a/cmd/boulder-ra/main.go +++ b/cmd/boulder-ra/main.go @@ -9,7 +9,6 @@ import ( "time" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd" - "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" "github.com/letsencrypt/boulder/core" "github.com/letsencrypt/boulder/cmd" @@ -45,14 +44,14 @@ func main() { go cmd.ProfileCmd("RA", stats) - connectionHandler := func(ch *amqp.Channel) *rpc.AmqpRPCServer { - vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, ch) + connectionHandler := func(srv *rpc.AmqpRPCServer) { + vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, srv.Channel) cmd.FailOnError(err, "Unable to create RPC client") - caRPC, err := rpc.NewAmqpRPCClient("RA->CA", c.AMQP.CA.Server, ch) + caRPC, err := rpc.NewAmqpRPCClient("RA->CA", c.AMQP.CA.Server, srv.Channel) cmd.FailOnError(err, "Unable to create RPC client") - saRPC, err := rpc.NewAmqpRPCClient("RA->SA", c.AMQP.SA.Server, ch) + saRPC, err := rpc.NewAmqpRPCClient("RA->SA", c.AMQP.SA.Server, srv.Channel) cmd.FailOnError(err, "Unable to create RPC client") vac, err := rpc.NewValidationAuthorityClient(vaRPC) @@ -67,17 +66,15 @@ func main() { rai.VA = &vac rai.CA = &cac rai.SA = &sac - - ras := rpc.NewAmqpRPCServer(c.AMQP.RA.Server, ch) - - err = rpc.NewRegistrationAuthorityServer(ras, &rai) - cmd.FailOnError(err, "Unable to create RA server") - - return ras } + ras, err := rpc.NewAmqpRPCServer(c.AMQP.RA.Server, connectionHandler) + cmd.FailOnError(err, "Unable to create RA RPC server") + rpc.NewRegistrationAuthorityServer(ras, &rai) + auditlogger.Info(app.VersionString()) - err = cmd.RunUntilSignaled(connectionHandler, c, auditlogger) + + err = ras.Start(c) cmd.FailOnError(err, "Unable to run RA RPC server") } diff --git a/cmd/boulder-sa/main.go b/cmd/boulder-sa/main.go index 13fa002b9..da477d82c 100644 --- a/cmd/boulder-sa/main.go +++ b/cmd/boulder-sa/main.go @@ -7,7 +7,6 @@ package main import ( "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd" - "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" "github.com/letsencrypt/boulder/cmd" blog "github.com/letsencrypt/boulder/log" @@ -44,17 +43,15 @@ func main() { go cmd.ProfileCmd("SA", stats) - connectionHandler := func(ch *amqp.Channel) *rpc.AmqpRPCServer { - sas := rpc.NewAmqpRPCServer(c.AMQP.SA.Server, ch) + connectionHandler := func(*rpc.AmqpRPCServer) {} - err = rpc.NewStorageAuthorityServer(sas, sai) - cmd.FailOnError(err, "Could create SA RPC server") - - return sas - } + sas, err := rpc.NewAmqpRPCServer(c.AMQP.SA.Server, connectionHandler) + cmd.FailOnError(err, "Unable to create SA RPC server") + rpc.NewStorageAuthorityServer(sas, sai) auditlogger.Info(app.VersionString()) - err = cmd.RunUntilSignaled(connectionHandler, c, auditlogger) + + err = sas.Start(c) cmd.FailOnError(err, "Unable to run SA RPC server") } diff --git a/cmd/boulder-va/main.go b/cmd/boulder-va/main.go index 1762eb106..79eb11f22 100644 --- a/cmd/boulder-va/main.go +++ b/cmd/boulder-va/main.go @@ -9,7 +9,6 @@ import ( "time" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd" - "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" "github.com/letsencrypt/boulder/cmd" "github.com/letsencrypt/boulder/core" @@ -43,25 +42,23 @@ func main() { vai.DNSResolver = core.NewDNSResolverImpl(dnsTimeout, []string{c.Common.DNSResolver}) vai.UserAgent = c.VA.UserAgent - connectionHandler := func(ch *amqp.Channel) *rpc.AmqpRPCServer { - raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, ch) + connectionHandler := func(srv *rpc.AmqpRPCServer) { + raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, srv.Channel) cmd.FailOnError(err, "Unable to create RPC client") rac, err := rpc.NewRegistrationAuthorityClient(raRPC) cmd.FailOnError(err, "Unable to create RA client") vai.RA = &rac - - vas := rpc.NewAmqpRPCServer(c.AMQP.VA.Server, ch) - - err = rpc.NewValidationAuthorityServer(vas, &vai) - cmd.FailOnError(err, "Unable to create VA server") - - return vas } + vas, err := rpc.NewAmqpRPCServer(c.AMQP.VA.Server, connectionHandler) + cmd.FailOnError(err, "Unable to create VA RPC server") + rpc.NewValidationAuthorityServer(vas, &vai) + auditlogger.Info(app.VersionString()) - err = cmd.RunUntilSignaled(connectionHandler, c, auditlogger) + + err = vas.Start(c) cmd.FailOnError(err, "Unable to run VA RPC server") } diff --git a/cmd/boulder-wfe/main.go b/cmd/boulder-wfe/main.go index 653e8d3ea..eb5c8dc6d 100644 --- a/cmd/boulder-wfe/main.go +++ b/cmd/boulder-wfe/main.go @@ -20,8 +20,9 @@ import ( ) func setupWFE(c cmd.Config) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient, chan *amqp.Error) { - ch, err := cmd.AmqpChannel(c) + ch, err := rpc.AmqpChannel(c) cmd.FailOnError(err, "Could not connect to AMQP") + auditlogger.Info(" [!] Connected to AMQP") closeChan := ch.NotifyClose(make(chan *amqp.Error, 1)) @@ -117,12 +118,11 @@ func main() { // with new RA and SA rpc clients. for { for err := range closeChan { - auditlogger.Warning(fmt.Sprintf("AMQP Channel closed, will reconnect in 5 seconds: [%s]", err)) + auditlogger.Warning(fmt.Sprintf(" [!] AMQP Channel closed, will reconnect in 5 seconds: [%s]", err)) time.Sleep(time.Second * 5) rac, sac, closeChan = setupWFE(c) wfe.RA = &rac wfe.SA = &sac - auditlogger.Warning("Reconnected to AMQP") } } }() diff --git a/cmd/ocsp-updater/main.go b/cmd/ocsp-updater/main.go index e42668e4e..a62968a76 100644 --- a/cmd/ocsp-updater/main.go +++ b/cmd/ocsp-updater/main.go @@ -37,7 +37,7 @@ type OCSPUpdater struct { } func setupClients(c cmd.Config) (rpc.CertificateAuthorityClient, chan *amqp.Error) { - ch, err := cmd.AmqpChannel(c) + ch, err := rpc.AmqpChannel(c) cmd.FailOnError(err, "Could not connect to AMQP") closeChan := ch.NotifyClose(make(chan *amqp.Error, 1)) @@ -223,7 +223,7 @@ func main() { // Abort if we disconnect from AMQP for { for err := range closeChan { - auditlogger.Warning(fmt.Sprintf("AMQP Channel closed, aborting early: [%s]", err)) + auditlogger.Warning(fmt.Sprintf(" [!] AMQP Channel closed, aborting early: [%s]", err)) panic(err) } } diff --git a/cmd/shell.go b/cmd/shell.go index f97d8207a..c4c9b95eb 100644 --- a/cmd/shell.go +++ b/cmd/shell.go @@ -22,8 +22,6 @@ package cmd import ( - "crypto/tls" - "crypto/x509" "encoding/json" "encoding/pem" "errors" @@ -35,16 +33,12 @@ import ( _ "net/http/pprof" "os" "runtime" - "strings" "time" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/codegangsta/cli" - "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" "github.com/letsencrypt/boulder/ca" "github.com/letsencrypt/boulder/core" - blog "github.com/letsencrypt/boulder/log" - "github.com/letsencrypt/boulder/rpc" ) // Config stores configuration parameters that applications @@ -256,126 +250,6 @@ func FailOnError(err error, msg string) { } } -// AmqpChannel is the same as amqpConnect in boulder, but with even -// more aggressive error dropping -func AmqpChannel(conf Config) (*amqp.Channel, error) { - var conn *amqp.Connection - var err error - - log := blog.GetAuditLogger() - - if conf.AMQP.TLS == nil { - // Configuration did not specify TLS options, but Dial will - // use TLS anyway if the URL scheme is "amqps" - conn, err = amqp.Dial(conf.AMQP.Server) - } else { - // They provided TLS options, so let's load them. - log.Info("AMQPS: Loading TLS Options.") - - if strings.HasPrefix(conf.AMQP.Server, "amqps") == false { - err = fmt.Errorf("AMQPS: TLS configuration provided, but not using an AMQPS URL") - return nil, err - } - - cfg := new(tls.Config) - - // If the configuration specified a certificate (or key), load them - if conf.AMQP.TLS.CertFile != nil || conf.AMQP.TLS.KeyFile != nil { - // But they have to give both. - if conf.AMQP.TLS.CertFile == nil || conf.AMQP.TLS.KeyFile == nil { - err = fmt.Errorf("AMQPS: You must set both of the configuration values AMQP.TLS.KeyFile and AMQP.TLS.CertFile") - return nil, err - } - - cert, err := tls.LoadX509KeyPair(*conf.AMQP.TLS.CertFile, *conf.AMQP.TLS.KeyFile) - if err != nil { - err = fmt.Errorf("AMQPS: Could not load Client Certificate or Key: %s", err) - return nil, err - } - - log.Info("AMQPS: Configured client certificate for AMQPS.") - cfg.Certificates = append(cfg.Certificates, cert) - } - - // If the configuration specified a CA certificate, make it the only - // available root. - if conf.AMQP.TLS.CACertFile != nil { - cfg.RootCAs = x509.NewCertPool() - - ca, err := ioutil.ReadFile(*conf.AMQP.TLS.CACertFile) - if err != nil { - err = fmt.Errorf("AMQPS: Could not load CA Certificate: %s", err) - return nil, err - } - cfg.RootCAs.AppendCertsFromPEM(ca) - log.Info("AMQPS: Configured CA certificate for AMQPS.") - } - - conn, err = amqp.DialTLS(conf.AMQP.Server, cfg) - } - - if err != nil { - return nil, err - } - - err = rpc.AMQPDeclareExchange(conn) - if err != nil { - return nil, err - } - - return conn.Channel() -} - -// RunForever starts the server and wait around -func RunForever(server *rpc.AmqpRPCServer) { - forever := make(chan bool) - server.Start() - fmt.Fprintf(os.Stderr, "Server running...\n") - <-forever -} - -// RunUntilSignaled starts the RPC server and runs in a loop reconnecting if the -// AMQP channel is closed. On SIGINT/SIGTERM/SIGHUP it will wait until the consumer -// has finished processing any retrieved messages before returning. -func RunUntilSignaled(connectionHandler func(ch *amqp.Channel) *rpc.AmqpRPCServer, c Config, logger *blog.AuditLogger) error { - for { - // Setup AMQP channel - ch, err := AmqpChannel(c) - if err != nil { - return err - } - - // Setup AmqpRPCServer will required handlers/clients - server := connectionHandler(ch) - - finishedProcessing, err := server.Start() - if err != nil { - return err - } - stopWatching, err := server.HandleInterrupts() - if err != nil { - return err - } - fmt.Fprintf(os.Stderr, "Server running...\n") - - // Block until channel closes - closeChan := ch.NotifyClose(make(chan *amqp.Error, 1)) - for finished := false; !finished; { - select { - case err := <-closeChan: - logger.Warning(fmt.Sprintf("AMQP Channel closed, will reconnect in 5 seconds: [%s]", err)) - stopWatching <- true - time.Sleep(time.Second * 5) - logger.Warning("Reconnecting to AMQP...") - finished = true - case <-finishedProcessing: - logger.Info(" [!] Finished processing remaining messages, exiting") - os.Exit(0) - } - } - } -} - // ProfileCmd runs forever, sending Go statistics to StatsD. func ProfileCmd(profileName string, stats statsd.Statter) { for { diff --git a/rpc/amqp-rpc.go b/rpc/amqp-rpc.go index bfefcf17e..6276de34c 100644 --- a/rpc/amqp-rpc.go +++ b/rpc/amqp-rpc.go @@ -6,16 +6,22 @@ package rpc import ( + "crypto/rand" + "crypto/tls" + "crypto/x509" "encoding/json" "errors" "fmt" + "io/ioutil" "os" "os/signal" + "strings" "sync" "syscall" "time" "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp" + "github.com/letsencrypt/boulder/cmd" "github.com/letsencrypt/boulder/core" blog "github.com/letsencrypt/boulder/log" ) @@ -97,18 +103,8 @@ func AMQPDeclareExchange(conn *amqp.Connection) error { return err } -// Create a quick consumer ID we can recreate later if we need to cancel -// a consumer. -func getConsumerName(serverQueue string) (string, error) { - hostname, err := os.Hostname() - if err != nil { - return "", err - } - return fmt.Sprintf("%s.%s", serverQueue, hostname), nil -} - // A simplified way to declare and subscribe to an AMQP queue -func amqpSubscribe(ch *amqp.Channel, name string, log *blog.AuditLogger) (<-chan amqp.Delivery, error) { +func amqpSubscribe(ch *amqp.Channel, name string, consumerName string, log *blog.AuditLogger) (<-chan amqp.Delivery, error) { var err error _, err = ch.QueueDeclare( @@ -136,11 +132,6 @@ func amqpSubscribe(ch *amqp.Channel, name string, log *blog.AuditLogger) (<-chan return nil, err } - consumerName, err := getConsumerName(name) - if err != nil { - return nil, err - } - // A consumer name is used so that the specific consumer can be cancelled later // if signalled. If no name is used a UID is used which cannot be retrieved (as // far as I can tell). @@ -167,23 +158,32 @@ func amqpSubscribe(ch *amqp.Channel, name string, log *blog.AuditLogger) (<-chan // To implement specific functionality, using code should use the Handle // method to add specific actions. type AmqpRPCServer struct { - serverQueue string - channel *amqp.Channel - log *blog.AuditLogger - dispatchTable map[string]func([]byte) ([]byte, error) + serverQueue string + Channel *amqp.Channel + log *blog.AuditLogger + dispatchTable map[string]func([]byte) ([]byte, error) + connectionHandler func(*AmqpRPCServer) + consumerName string + connected bool + done bool + dMu sync.Mutex } // NewAmqpRPCServer creates a new RPC server on the given queue and channel. // Note that you must call Start() to actually start the server // listening for requests. -func NewAmqpRPCServer(serverQueue string, channel *amqp.Channel) *AmqpRPCServer { +func NewAmqpRPCServer(serverQueue string, handler func(*AmqpRPCServer)) (*AmqpRPCServer, error) { log := blog.GetAuditLogger() + b := make([]byte, 4) + rand.Read(b) + consumerName := fmt.Sprintf("%s.%x", serverQueue, b) return &AmqpRPCServer{ - serverQueue: serverQueue, - channel: channel, - log: log, - dispatchTable: make(map[string]func([]byte) ([]byte, error)), - } + serverQueue: serverQueue, + log: log, + dispatchTable: make(map[string]func([]byte) ([]byte, error)), + connectionHandler: handler, + consumerName: consumerName, + }, nil } // Handle registers a function to handle a particular method. @@ -259,80 +259,186 @@ type RPCResponse struct { Error RPCError `json:"error,omitempty"` } -// Start starts the AMQP-RPC server running in a separate thread. A channel, finished, -// is returned so that the caller can detect when the for/range messages loop has -// broke in the spawned thread so we can cleanly exit. -func (rpc *AmqpRPCServer) Start() (finished chan bool, err error) { - msgs, err := amqpSubscribe(rpc.channel, rpc.serverQueue, rpc.log) - if err != nil { - return +// AmqpChannel sets a AMQP connection up using SSL if configuration is provided +func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) { + var conn *amqp.Connection + var err error + + log := blog.GetAuditLogger() + + if conf.AMQP.TLS == nil { + // Configuration did not specify TLS options, but Dial will + // use TLS anyway if the URL scheme is "amqps" + conn, err = amqp.Dial(conf.AMQP.Server) + } else { + // They provided TLS options, so let's load them. + log.Info("AMQPS: Loading TLS Options.") + + if strings.HasPrefix(conf.AMQP.Server, "amqps") == false { + err = fmt.Errorf("AMQPS: TLS configuration provided, but not using an AMQPS URL") + return nil, err + } + + cfg := new(tls.Config) + + // If the configuration specified a certificate (or key), load them + if conf.AMQP.TLS.CertFile != nil || conf.AMQP.TLS.KeyFile != nil { + // But they have to give both. + if conf.AMQP.TLS.CertFile == nil || conf.AMQP.TLS.KeyFile == nil { + err = fmt.Errorf("AMQPS: You must set both of the configuration values AMQP.TLS.KeyFile and AMQP.TLS.CertFile") + return nil, err + } + + cert, err := tls.LoadX509KeyPair(*conf.AMQP.TLS.CertFile, *conf.AMQP.TLS.KeyFile) + if err != nil { + err = fmt.Errorf("AMQPS: Could not load Client Certificate or Key: %s", err) + return nil, err + } + + log.Info("AMQPS: Configured client certificate for AMQPS.") + cfg.Certificates = append(cfg.Certificates, cert) + } + + // If the configuration specified a CA certificate, make it the only + // available root. + if conf.AMQP.TLS.CACertFile != nil { + cfg.RootCAs = x509.NewCertPool() + + ca, err := ioutil.ReadFile(*conf.AMQP.TLS.CACertFile) + if err != nil { + err = fmt.Errorf("AMQPS: Could not load CA Certificate: %s", err) + return nil, err + } + cfg.RootCAs.AppendCertsFromPEM(ca) + log.Info("AMQPS: Configured CA certificate for AMQPS.") + } + + conn, err = amqp.DialTLS(conf.AMQP.Server, cfg) } - finished = make(chan bool, 1) - go func() { - for msg := range msgs { - // XXX-JWS: jws.Verify(body) - cb, present := rpc.dispatchTable[msg.Type] - rpc.log.Info(fmt.Sprintf(" [s<][%s][%s] received %s(%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(msg.Body), msg.CorrelationId)) - if !present { - // AUDIT[ Misrouted Messages ] f523f21f-12d2-4c31-b2eb-ee4b7d96d60e - rpc.log.Audit(fmt.Sprintf(" [s<][%s][%s] Misrouted message: %s - %s - %s", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(msg.Body), msg.CorrelationId)) - continue - } - var response RPCResponse - response.ReturnVal, err = cb(msg.Body) - response.Error = wrapError(err) - jsonResponse, err := json.Marshal(response) - if err != nil { - // AUDIT[ Error Conditions ] 9cc4d537-8534-4970-8665-4b382abe82f3 - rpc.log.Audit(fmt.Sprintf(" [s>][%s][%s] Error condition marshalling RPC response %s [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, msg.CorrelationId)) - continue - } - rpc.log.Info(fmt.Sprintf(" [s>][%s][%s] replying %s(%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(jsonResponse), msg.CorrelationId)) - rpc.channel.Publish( - AmqpExchange, - msg.ReplyTo, - AmqpMandatory, - AmqpImmediate, - amqp.Publishing{ - CorrelationId: msg.CorrelationId, - Type: msg.Type, - Body: jsonResponse, // XXX-JWS: jws.Sign(privKey, body) - }) - } - finished <- true - }() - return + if err != nil { + return nil, err + } + + err = AMQPDeclareExchange(conn) + if err != nil { + return nil, err + } + + return conn.Channel() } -// HandleInterrupts creates a Goroutine to sit and watch for INT, TERM, or HUP signals -// and cancel the server consumer if it sees one so servers can gracefully shutdown. -func (rpc *AmqpRPCServer) HandleInterrupts() (chan bool, error) { - stopWatching := make(chan bool, 1) +func (rpc *AmqpRPCServer) processMessage(msg amqp.Delivery) { + // XXX-JWS: jws.Verify(body) + cb, present := rpc.dispatchTable[msg.Type] + rpc.log.Info(fmt.Sprintf(" [s<][%s][%s] received %s(%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(msg.Body), msg.CorrelationId)) + if !present { + // AUDIT[ Misrouted Messages ] f523f21f-12d2-4c31-b2eb-ee4b7d96d60e + rpc.log.Audit(fmt.Sprintf(" [s<][%s][%s] Misrouted message: %s - %s - %s", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(msg.Body), msg.CorrelationId)) + return + } + var response RPCResponse + var err error + response.ReturnVal, err = cb(msg.Body) + response.Error = wrapError(err) + jsonResponse, err := json.Marshal(response) + if err != nil { + // AUDIT[ Error Conditions ] 9cc4d537-8534-4970-8665-4b382abe82f3 + rpc.log.Audit(fmt.Sprintf(" [s>][%s][%s] Error condition marshalling RPC response %s [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, msg.CorrelationId)) + return + } + rpc.log.Info(fmt.Sprintf(" [s>][%s][%s] replying %s(%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(jsonResponse), msg.CorrelationId)) + rpc.Channel.Publish( + AmqpExchange, + msg.ReplyTo, + AmqpMandatory, + AmqpImmediate, + amqp.Publishing{ + CorrelationId: msg.CorrelationId, + Type: msg.Type, + Body: jsonResponse, // XXX-JWS: jws.Sign(privKey, body) + }) +} + +// Start starts the AMQP-RPC server and handles reconnections, this will block +// until a fatal error is returned or AmqpRPCServer.Stop() is called and all +// remaining messages are processed. +func (rpc *AmqpRPCServer) Start(c cmd.Config) error { + go rpc.catchSignals() + for { + rpc.dMu.Lock() + if rpc.done { + rpc.dMu.Unlock() + break + } + rpc.dMu.Unlock() + var err error + rpc.Channel, err = AmqpChannel(c) + if err != nil { + return err + } + rpc.connectionHandler(rpc) + + msgs, err := amqpSubscribe(rpc.Channel, rpc.serverQueue, rpc.consumerName, rpc.log) + if err != nil { + return err + } + rpc.connected = true + rpc.log.Info(" [!] Connected to AMQP") + + closeChan := rpc.Channel.NotifyClose(make(chan *amqp.Error, 1)) + for blocking := true; blocking; { + select { + case msg, ok := <-msgs: + if ok { + rpc.processMessage(msg) + } else { + rpc.log.Info(" [!] Finished processing messages") + rpc.done = true + blocking = false + } + case err = <-closeChan: + rpc.connected = false + rpc.log.Warning(fmt.Sprintf(" [!] AMQP Channel closed, will reconnect in 5 seconds: [%s]", err)) + time.Sleep(time.Second * 5) + blocking = false + } + } + } + return nil +} + +var signalToName = map[os.Signal]string{ + syscall.SIGTERM: "SIGTERM", + syscall.SIGINT: "SIGINT", + syscall.SIGHUP: "SIGHUP", +} + +func (rpc *AmqpRPCServer) catchSignals() { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM) signal.Notify(sigChan, syscall.SIGINT) signal.Notify(sigChan, syscall.SIGHUP) - consumerName, err := getConsumerName(rpc.serverQueue) - if err != nil { - return nil, err + sig := <-sigChan + rpc.log.Info(fmt.Sprintf(" [!] Caught %s", signalToName[sig])) + rpc.Stop() + signal.Stop(sigChan) +} + +// Stop gracefully stops the AmqpRPCServer, after calling AmqpRPCServer.Start will +// continue blocking until it has processed any messages that have already been +// retrieved. +func (rpc *AmqpRPCServer) Stop() { + if rpc.connected { + rpc.log.Info(" [!] Shutting down RPC server, stopping new deliveries and processing remaining messages") + rpc.Channel.Cancel(rpc.consumerName, false) + } else { + rpc.log.Info("[!] Shutting down RPC server, nothing to clean up") + rpc.dMu.Lock() + rpc.done = true + rpc.dMu.Unlock() } - - go func() { - for finished := false; !finished; { - select { - case <-sigChan: - rpc.log.Info(" [!] SIGTERM/SIGINT recieved, stopping new deliveries and processing remaining messages") - rpc.channel.Cancel(consumerName, false) - finished = true - case <-stopWatching: - finished = true - } - } - }() - - return stopWatching, nil } // AmqpRPCCLient is an AMQP-RPC client that sends requests to a specific server @@ -383,7 +489,7 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, channel *amqp.Chann } // Subscribe to the response queue and dispatch - msgs, err := amqpSubscribe(rpc.channel, clientQueue, rpc.log) + msgs, err := amqpSubscribe(rpc.channel, clientQueue, "", rpc.log) if err != nil { return nil, err }