From 7dcfcd7864e5e9c4b9116bb4972f0b4b84dba57d Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Tue, 17 Nov 2015 14:11:06 -0800 Subject: [PATCH 1/3] Add configurable RPC timeouts per backend. In the process, break out AMQP config into its own struct, one per service. The AMQPConfig struct is included by composition in the config structs that need it. If any given service lacks an AMQP config of its own, it gets a default value from the top-level AMQP config struct, for deployability reasons. Tightens the RPC code to take a specific AMQP config, not an over-broad cmd.Config. Shortens construction of specific RPC clients so they instatiate the generic client connection themselves, simplifying per-service startup code. Remove unused SetTimeout method on RPC clients. --- cmd/activity-monitor/main.go | 3 +- cmd/admin-revoker/main.go | 14 ++-- cmd/boulder-ca/main.go | 19 ++--- cmd/boulder-publisher/main.go | 14 ++-- cmd/boulder-ra/main.go | 29 +++---- cmd/boulder-sa/main.go | 10 ++- cmd/boulder-va/main.go | 13 ++- cmd/boulder-wfe/main.go | 18 ++--- cmd/config.go | 105 ++++++++++++------------ cmd/expiration-mailer/main.go | 7 +- cmd/ocsp-updater/main.go | 30 +++---- cmd/shell.go | 29 +++++++ rpc/amqp-rpc.go | 69 ++++++++-------- rpc/connection.go | 8 +- rpc/connection_test.go | 8 +- rpc/rpc-interfaces.go | 5 -- rpc/rpc-wrappers.go | 32 ++++---- rpc/rpc-wrappers_test.go | 16 +--- test/boulder-config.json | 148 +++++++++++++++++++++++++--------- 19 files changed, 316 insertions(+), 261 deletions(-) diff --git a/cmd/activity-monitor/main.go b/cmd/activity-monitor/main.go index 8a0eaf404..c84c28936 100644 --- a/cmd/activity-monitor/main.go +++ b/cmd/activity-monitor/main.go @@ -115,7 +115,8 @@ func main() { app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) { go cmd.DebugServer(c.ActivityMonitor.DebugAddr) - ch, err := rpc.AmqpChannel(c) + amqpConf := c.ActivityMonitor.AMQP + ch, err := rpc.AmqpChannel(amqpConf) cmd.FailOnError(err, "Could not connect to AMQP") diff --git a/cmd/admin-revoker/main.go b/cmd/admin-revoker/main.go index e110aa887..d244aee22 100644 --- a/cmd/admin-revoker/main.go +++ b/cmd/admin-revoker/main.go @@ -42,22 +42,18 @@ func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog. stats, auditlogger := cmd.StatsAndLogging(c.Statsd, c.Syslog) - raRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->RA", c.AMQP.RA.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - rac, err := rpc.NewRegistrationAuthorityClient(raRPC) + amqpConf := c.Revoker.AMQP + clientName := "AdminRevoker" + rac, err := rpc.NewRegistrationAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create CA client") dbMap, err := sa.NewDbMap(c.Revoker.DBConnect) cmd.FailOnError(err, "Couldn't setup database connection") - saRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->SA", c.AMQP.SA.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - sac, err := rpc.NewStorageAuthorityClient(saRPC) + sac, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Failed to create SA client") - return rac, auditlogger, dbMap, sac + return *rac, auditlogger, dbMap, *sac } func addDeniedNames(tx *gorp.Transaction, names []string) (err error) { diff --git a/cmd/boulder-ca/main.go b/cmd/boulder-ca/main.go index 6a5004b4b..798f14e01 100644 --- a/cmd/boulder-ca/main.go +++ b/cmd/boulder-ca/main.go @@ -41,26 +41,19 @@ func main() { go cmd.ProfileCmd("CA", stats) - saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - sac, err := rpc.NewStorageAuthorityClient(saRPC) + amqpConf := c.CA.AMQP + clientName := "CA" + cai.SA, err = rpc.NewStorageAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Failed to create SA client") - pubRPC, err := rpc.NewAmqpRPCClient("CA->Publisher", c.AMQP.Publisher.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - pubc, err := rpc.NewPublisherClient(pubRPC) + cai.Publisher, err = rpc.NewPublisherClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Failed to create Publisher client") - cai.Publisher = &pubc - cai.SA = &sac - - cas, err := rpc.NewAmqpRPCServer(c.AMQP.CA.Server, c.CA.MaxConcurrentRPCServerRequests, c) + cas, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.CA, c.CA.MaxConcurrentRPCServerRequests, stats) cmd.FailOnError(err, "Unable to create CA RPC server") rpc.NewCertificateAuthorityServer(cas, cai) - err = cas.Start(c) + err = cas.Start(amqpConf) cmd.FailOnError(err, "Unable to run CA RPC server") } diff --git a/cmd/boulder-publisher/main.go b/cmd/boulder-publisher/main.go index 7be9e8044..a0b7bf1b4 100644 --- a/cmd/boulder-publisher/main.go +++ b/cmd/boulder-publisher/main.go @@ -23,19 +23,17 @@ func main() { go cmd.DebugServer(c.Publisher.DebugAddr) go cmd.ProfileCmd("Publisher", stats) - saRPC, err := rpc.NewAmqpRPCClient("Publisher->SA", c.AMQP.SA.Server, c, stats) - cmd.FailOnError(err, "Unable to create SA RPC client") - - sac, err := rpc.NewStorageAuthorityClient(saRPC) + pubConf := c.Publisher + amqpConf := pubConf.AMQP + clientName := "Publisher" + pubi.SA, err = rpc.NewStorageAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create SA client") - pubi.SA = &sac - - pubs, err := rpc.NewAmqpRPCServer(c.AMQP.Publisher.Server, c.Publisher.MaxConcurrentRPCServerRequests, c) + pubs, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.Publisher, pubConf.MaxConcurrentRPCServerRequests, stats) cmd.FailOnError(err, "Unable to create Publisher RPC server") rpc.NewPublisherServer(pubs, &pubi) - err = pubs.Start(c) + err = pubs.Start(amqpConf) cmd.FailOnError(err, "Unable to run Publisher RPC server") } diff --git a/cmd/boulder-ra/main.go b/cmd/boulder-ra/main.go index 9e94761d8..e46d6e2dc 100644 --- a/cmd/boulder-ra/main.go +++ b/cmd/boulder-ra/main.go @@ -39,27 +39,20 @@ func main() { go cmd.ProfileCmd("RA", stats) - vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - caRPC, err := rpc.NewAmqpRPCClient("RA->CA", c.AMQP.CA.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - saRPC, err := rpc.NewAmqpRPCClient("RA->SA", c.AMQP.SA.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - vac, err := rpc.NewValidationAuthorityClient(vaRPC) + amqpConf := c.RA.AMQP + clientName := "RA" + vac, err := rpc.NewValidationAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create VA client") - cac, err := rpc.NewCertificateAuthorityClient(caRPC) + cac, err := rpc.NewCertificateAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create CA client") - sac, err := rpc.NewStorageAuthorityClient(saRPC) + sac, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create SA client") var dc *ra.DomainCheck if c.RA.UseIsSafeDomain { - dc = &ra.DomainCheck{VA: &vac} + dc = &ra.DomainCheck{VA: vac} } rai := ra.NewRegistrationAuthorityImpl(clock.Default(), auditlogger, stats, @@ -73,15 +66,15 @@ func main() { rai.DNSResolver = core.NewTestDNSResolverImpl(raDNSTimeout, []string{c.Common.DNSResolver}) } - rai.VA = &vac - rai.CA = &cac - rai.SA = &sac + rai.VA = vac + rai.CA = cac + rai.SA = sac - ras, err := rpc.NewAmqpRPCServer(c.AMQP.RA.Server, c.RA.MaxConcurrentRPCServerRequests, c) + ras, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.RA, c.RA.MaxConcurrentRPCServerRequests, stats) cmd.FailOnError(err, "Unable to create RA RPC server") rpc.NewRegistrationAuthorityServer(ras, rai) - err = ras.Start(c) + err = ras.Start(amqpConf) cmd.FailOnError(err, "Unable to run RA RPC server") } diff --git a/cmd/boulder-sa/main.go b/cmd/boulder-sa/main.go index b37f91bf2..e5f7dd2ad 100644 --- a/cmd/boulder-sa/main.go +++ b/cmd/boulder-sa/main.go @@ -17,9 +17,10 @@ import ( func main() { app := cmd.NewAppShell("boulder-sa", "Handles SQL operations") app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) { - go cmd.DebugServer(c.SA.DebugAddr) + saConf := c.SA + go cmd.DebugServer(saConf.DebugAddr) - dbMap, err := sa.NewDbMap(c.SA.DBConnect) + dbMap, err := sa.NewDbMap(saConf.DBConnect) cmd.FailOnError(err, "Couldn't connect to SA database") sai, err := sa.NewSQLStorageAuthority(dbMap, clock.Default()) @@ -28,11 +29,12 @@ func main() { go cmd.ProfileCmd("SA", stats) - sas, err := rpc.NewAmqpRPCServer(c.AMQP.SA.Server, c.SA.MaxConcurrentRPCServerRequests, c) + amqpConf := saConf.AMQP + sas, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.SA, c.SA.MaxConcurrentRPCServerRequests, stats) cmd.FailOnError(err, "Unable to create SA RPC server") rpc.NewStorageAuthorityServer(sas, sai) - err = sas.Start(c) + err = sas.Start(amqpConf) cmd.FailOnError(err, "Unable to run SA RPC server") } diff --git a/cmd/boulder-va/main.go b/cmd/boulder-va/main.go index c9f5e92ff..12b09c805 100644 --- a/cmd/boulder-va/main.go +++ b/cmd/boulder-va/main.go @@ -50,19 +50,18 @@ func main() { } vai.UserAgent = c.VA.UserAgent - raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - rac, err := rpc.NewRegistrationAuthorityClient(raRPC) + amqpConf := c.VA.AMQP + clientName := "VA" + rac, err := rpc.NewRegistrationAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create RA client") - vai.RA = &rac + vai.RA = rac - vas, err := rpc.NewAmqpRPCServer(c.AMQP.VA.Server, c.VA.MaxConcurrentRPCServerRequests, c) + vas, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.VA, c.VA.MaxConcurrentRPCServerRequests, stats) cmd.FailOnError(err, "Unable to create VA RPC server") rpc.NewValidationAuthorityServer(vas, vai) - err = vas.Start(c) + err = vas.Start(amqpConf) cmd.FailOnError(err, "Unable to run VA RPC server") } diff --git a/cmd/boulder-wfe/main.go b/cmd/boulder-wfe/main.go index b5c56d36b..c5e55c031 100644 --- a/cmd/boulder-wfe/main.go +++ b/cmd/boulder-wfe/main.go @@ -22,17 +22,13 @@ import ( "github.com/letsencrypt/boulder/wfe" ) -func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient) { - raRPC, err := rpc.NewAmqpRPCClient("WFE->RA", c.AMQP.RA.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - saRPC, err := rpc.NewAmqpRPCClient("WFE->SA", c.AMQP.SA.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - rac, err := rpc.NewRegistrationAuthorityClient(raRPC) +func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (*rpc.RegistrationAuthorityClient, *rpc.StorageAuthorityClient) { + amqpConf := c.WFE.AMQP + clientName := "WFE" + rac, err := rpc.NewRegistrationAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create RA client") - sac, err := rpc.NewStorageAuthorityClient(saRPC) + sac, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create SA client") return rac, sac @@ -59,8 +55,8 @@ func main() { wfe, err := wfe.NewWebFrontEndImpl(stats, clock.Default()) cmd.FailOnError(err, "Unable to create WFE") rac, sac := setupWFE(c, auditlogger, stats) - wfe.RA = &rac - wfe.SA = &sac + wfe.RA = rac + wfe.SA = sac wfe.SubscriberAgreementURL = c.SubscriberAgreementURL wfe.AllowOrigins = c.WFE.AllowOrigins diff --git a/cmd/config.go b/cmd/config.go index 5562a90dc..c320cf7f0 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -24,28 +24,15 @@ import ( // Note: NO DEFAULTS are provided. type Config struct { ActivityMonitor struct { - // DebugAddr is the address to run the /debug handlers on. - DebugAddr string + ServiceConfig } - // General - AMQP struct { - Server string - Insecure bool - RA Queue - VA Queue - SA Queue - CA Queue - OCSP Queue - Publisher Queue - TLS *TLSConfig - ReconnectTimeouts struct { - Base ConfigDuration - Max ConfigDuration - } - } + // Default AMQPConfig for services that don't specify one. + // TODO(jsha): Delete this after a deploy. + AMQP *AMQPConfig WFE struct { + ServiceConfig BaseURL string ListenAddress string @@ -58,19 +45,13 @@ type Config struct { ShutdownStopTimeout string ShutdownKillTimeout string - - // DebugAddr is the address to run the /debug handlers on. - DebugAddr string } CA CAConfig - Monolith struct { - // DebugAddr is the address to run the /debug handlers on. - DebugAddr string - } - RA struct { + ServiceConfig + RateLimitPoliciesFilename string MaxConcurrentRPCServerRequests int64 @@ -79,21 +60,19 @@ type Config struct { // UseIsSafeDomain determines whether to call VA.IsSafeDomain UseIsSafeDomain bool // TODO(jmhodges): remove after va IsSafeDomain deploy - - // DebugAddr is the address to run the /debug handlers on. - DebugAddr string } SA struct { + ServiceConfig + DBConnect string MaxConcurrentRPCServerRequests int64 - - // DebugAddr is the address to run the /debug handlers on. - DebugAddr string } VA struct { + ServiceConfig + UserAgent string PortConfig va.PortConfig @@ -101,9 +80,6 @@ type Config struct { MaxConcurrentRPCServerRequests int64 GoogleSafeBrowsing *GoogleSafeBrowsingConfig - - // DebugAddr is the address to run the /debug handlers on. - DebugAddr string } SQL struct { @@ -116,9 +92,14 @@ type Config struct { Revoker struct { DBConnect string + // The revoker isn't a long running service, so doesn't get a full + // ServiceConfig, just an AMQPConfig. + AMQP *AMQPConfig } Mailer struct { + ServiceConfig + Server string Port string Username string @@ -134,12 +115,11 @@ type Config struct { NagCheckInterval string // Path to a text/template email template EmailTemplate string - - // DebugAddr is the address to run the /debug handlers on. - DebugAddr string } OCSPResponder struct { + ServiceConfig + // Source indicates the source of pre-signed OCSP responses to be used. It // can be a DBConnect string or a file URL. The file URL style is used // when responding from a static file for intermediates and roots. @@ -153,18 +133,13 @@ type Config struct { ShutdownStopTimeout string ShutdownKillTimeout string - - // DebugAddr is the address to run the /debug handlers on. - DebugAddr string } OCSPUpdater OCSPUpdaterConfig Publisher struct { + ServiceConfig MaxConcurrentRPCServerRequests int64 - - // DebugAddr is the address to run the /debug handlers on. - DebugAddr string } ExternalCertImporter struct { @@ -197,10 +172,37 @@ type Config struct { SubscriberAgreementURL string } +// ServiceConfig contains config items that are common to all our services, to +// be embedded in other config structs. +type ServiceConfig struct { + // DebugAddr is the address to run the /debug handlers on. + DebugAddr string + AMQP *AMQPConfig +} + +// AMQPConfig describes how to connect to AMQP, and how to speak to each of the +// RPC services we offer via AMQP. +type AMQPConfig struct { + Server string + Insecure bool + RA *RPCServerConfig + VA *RPCServerConfig + SA *RPCServerConfig + CA *RPCServerConfig + Publisher *RPCServerConfig + TLS *TLSConfig + ReconnectTimeouts struct { + Base ConfigDuration + Max ConfigDuration + } +} + // CAConfig structs have configuration information for the certificate // authority, including database parameters as well as controls for // issued certificates. type CAConfig struct { + ServiceConfig + Profile string TestMode bool DBConnect string @@ -219,9 +221,6 @@ type CAConfig struct { MaxConcurrentRPCServerRequests int64 HSMFaultTimeout ConfigDuration - - // DebugAddr is the address to run the /debug handlers on. - DebugAddr string } // PAConfig specifies how a policy authority should connect to its @@ -280,14 +279,17 @@ type TLSConfig struct { CACertFile *string } -// Queue describes a queue name -type Queue struct { - Server string +// RPCServerConfig contains configuration particular to a specific RPC server +// type (e.g. RA, SA, etc) +type RPCServerConfig struct { + Server string // Queue name where the server receives requests + RPCTimeout ConfigDuration } // OCSPUpdaterConfig provides the various window tick times and batch sizes needed // for the OCSP (and SCT) updater type OCSPUpdaterConfig struct { + ServiceConfig DBConnect string NewCertificateWindow ConfigDuration @@ -312,9 +314,6 @@ type OCSPUpdaterConfig struct { SignFailureBackoffFactor float64 SignFailureBackoffMax ConfigDuration - - // DebugAddr is the address to run the /debug handlers on. - DebugAddr string } // GoogleSafeBrowsingConfig is the JSON config struct for the VA's use of the diff --git a/cmd/expiration-mailer/main.go b/cmd/expiration-mailer/main.go index e2b3496e1..5e0736d41 100644 --- a/cmd/expiration-mailer/main.go +++ b/cmd/expiration-mailer/main.go @@ -232,10 +232,9 @@ func main() { dbMap, err := sa.NewDbMap(c.Mailer.DBConnect) cmd.FailOnError(err, "Could not connect to database") - saRPC, err := rpc.NewAmqpRPCClient("ExpirationMailer->SA", c.AMQP.SA.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - sac, err := rpc.NewStorageAuthorityClient(saRPC) + amqpConf := c.SA.AMQP + clientName := "ExpirationMailer" + sac, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Failed to create SA client") // Load email template diff --git a/cmd/ocsp-updater/main.go b/cmd/ocsp-updater/main.go index ce2db0d83..ed0d24c94 100644 --- a/cmd/ocsp-updater/main.go +++ b/cmd/ocsp-updater/main.go @@ -531,28 +531,21 @@ func (l *looper) loop() error { } } -func setupClients(c cmd.Config, stats statsd.Statter) ( +func setupClients(c cmd.OCSPUpdaterConfig, stats statsd.Statter) ( core.CertificateAuthority, core.Publisher, core.StorageAuthority, ) { - caRPC, err := rpc.NewAmqpRPCClient("OCSP->CA", c.AMQP.CA.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - cac, err := rpc.NewCertificateAuthorityClient(caRPC) + amqpConf := c.AMQP + clientName := "OCSP" + cac, err := rpc.NewCertificateAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create CA client") - pubRPC, err := rpc.NewAmqpRPCClient("OCSP->Publisher", c.AMQP.Publisher.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - pubc, err := rpc.NewPublisherClient(pubRPC) + pubc, err := rpc.NewPublisherClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create Publisher client") - saRPC, err := rpc.NewAmqpRPCClient("OCSP->SA", c.AMQP.SA.Server, c, stats) - cmd.FailOnError(err, "Unable to create RPC client") - - sac, err := rpc.NewStorageAuthorityClient(saRPC) - cmd.FailOnError(err, "Unable to create Publisher client") + sac, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats) + cmd.FailOnError(err, "Unable to create SA client") return cac, pubc, sac } @@ -561,14 +554,15 @@ func main() { app := cmd.NewAppShell("ocsp-updater", "Generates and updates OCSP responses") app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) { - go cmd.DebugServer(c.OCSPUpdater.DebugAddr) + conf := c.OCSPUpdater + go cmd.DebugServer(conf.DebugAddr) go cmd.ProfileCmd("OCSP-Updater", stats) // Configure DB - dbMap, err := sa.NewDbMap(c.OCSPUpdater.DBConnect) + dbMap, err := sa.NewDbMap(conf.DBConnect) cmd.FailOnError(err, "Could not connect to database") - cac, pubc, sac := setupClients(c, stats) + cac, pubc, sac := setupClients(conf, stats) updater, err := newUpdater( stats, @@ -578,7 +572,7 @@ func main() { pubc, sac, // Necessary evil for now - c.OCSPUpdater, + conf, len(c.Common.CT.Logs), c.Common.IssuerCert, ) diff --git a/cmd/shell.go b/cmd/shell.go index e81f902dd..5f3cdd780 100644 --- a/cmd/shell.go +++ b/cmd/shell.go @@ -95,6 +95,35 @@ func (as *AppShell) Run() { config = as.Config(c, config) } + // Provide default values for each service's AMQP config section. + if config.ActivityMonitor.AMQP == nil { + config.ActivityMonitor.AMQP = config.AMQP + } + if config.WFE.AMQP == nil { + config.WFE.AMQP = config.AMQP + } + if config.CA.AMQP == nil { + config.CA.AMQP = config.AMQP + } + if config.RA.AMQP == nil { + config.RA.AMQP = config.AMQP + } + if config.SA.AMQP == nil { + config.SA.AMQP = config.AMQP + } + if config.VA.AMQP == nil { + config.VA.AMQP = config.AMQP + } + if config.Mailer.AMQP == nil { + config.Mailer.AMQP = config.AMQP + } + if config.OCSPResponder.AMQP == nil { + config.OCSPResponder.AMQP = config.AMQP + } + if config.Publisher.AMQP == nil { + config.Publisher.AMQP = config.AMQP + } + stats, auditlogger := StatsAndLogging(config.Statsd, config.Syslog) auditlogger.Info(as.VersionString()) diff --git a/rpc/amqp-rpc.go b/rpc/amqp-rpc.go index 4fcab4218..c2dff2fa9 100644 --- a/rpc/amqp-rpc.go +++ b/rpc/amqp-rpc.go @@ -177,26 +177,21 @@ type AmqpRPCServer struct { // NewAmqpRPCServer creates a new RPC server for the given queue and will begin // consuming requests from the queue. To start the server you must call Start(). -func NewAmqpRPCServer(serverQueue string, maxConcurrentRPCServerRequests int64, c cmd.Config) (*AmqpRPCServer, error) { +func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, rpcConf *cmd.RPCServerConfig, maxConcurrentRPCServerRequests int64, stats statsd.Statter) (*AmqpRPCServer, error) { log := blog.GetAuditLogger() - reconnectBase := c.AMQP.ReconnectTimeouts.Base.Duration + reconnectBase := amqpConf.ReconnectTimeouts.Base.Duration if reconnectBase == 0 { reconnectBase = 20 * time.Millisecond } - reconnectMax := c.AMQP.ReconnectTimeouts.Max.Duration + reconnectMax := amqpConf.ReconnectTimeouts.Max.Duration if reconnectMax == 0 { reconnectMax = time.Minute } - stats, err := statsd.NewClient(c.Statsd.Server, c.Statsd.Prefix) - if err != nil { - return nil, err - } - return &AmqpRPCServer{ - serverQueue: serverQueue, - connection: newAMQPConnector(serverQueue, reconnectBase, reconnectMax), + serverQueue: rpcConf.Server, + connection: newAMQPConnector(rpcConf.Server, reconnectBase, reconnectMax), log: log, dispatchTable: make(map[string]func([]byte) ([]byte, error)), maxConcurrentRPCServerRequests: maxConcurrentRPCServerRequests, @@ -295,25 +290,25 @@ type rpcResponse struct { } // AmqpChannel sets a AMQP connection up using SSL if configuration is provided -func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) { +func AmqpChannel(conf *cmd.AMQPConfig) (*amqp.Channel, error) { var conn *amqp.Connection var err error log := blog.GetAuditLogger() - if conf.AMQP.Insecure == true { + if conf.Insecure == true { // If the Insecure flag is true, then just go ahead and connect - conn, err = amqp.Dial(conf.AMQP.Server) + conn, err = amqp.Dial(conf.Server) } else { // The insecure flag is false or not set, so we need to load up the options log.Info("AMQPS: Loading TLS Options.") - if strings.HasPrefix(conf.AMQP.Server, "amqps") == false { + if strings.HasPrefix(conf.Server, "amqps") == false { err = fmt.Errorf("AMQPS: Not using an AMQPS URL. To use AMQP instead of AMQPS, set insecure=true") return nil, err } - if conf.AMQP.TLS == nil { + if conf.TLS == nil { err = fmt.Errorf("AMQPS: No TLS configuration provided. To use AMQP instead of AMQPS, set insecure=true") return nil, err } @@ -321,14 +316,14 @@ func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) { cfg := new(tls.Config) // If the configuration specified a certificate (or key), load them - if conf.AMQP.TLS.CertFile != nil || conf.AMQP.TLS.KeyFile != nil { + if conf.TLS.CertFile != nil || conf.TLS.KeyFile != nil { // But they have to give both. - if conf.AMQP.TLS.CertFile == nil || conf.AMQP.TLS.KeyFile == nil { + if conf.TLS.CertFile == nil || conf.TLS.KeyFile == nil { err = fmt.Errorf("AMQPS: You must set both of the configuration values AMQP.TLS.KeyFile and AMQP.TLS.CertFile") return nil, err } - cert, err := tls.LoadX509KeyPair(*conf.AMQP.TLS.CertFile, *conf.AMQP.TLS.KeyFile) + cert, err := tls.LoadX509KeyPair(*conf.TLS.CertFile, *conf.TLS.KeyFile) if err != nil { err = fmt.Errorf("AMQPS: Could not load Client Certificate or Key: %s", err) return nil, err @@ -340,10 +335,10 @@ func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) { // If the configuration specified a CA certificate, make it the only // available root. - if conf.AMQP.TLS.CACertFile != nil { + if conf.TLS.CACertFile != nil { cfg.RootCAs = x509.NewCertPool() - ca, err := ioutil.ReadFile(*conf.AMQP.TLS.CACertFile) + ca, err := ioutil.ReadFile(*conf.TLS.CACertFile) if err != nil { err = fmt.Errorf("AMQPS: Could not load CA Certificate: %s", err) return nil, err @@ -352,7 +347,7 @@ func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) { log.Info("AMQPS: Configured CA certificate for AMQPS.") } - conn, err = amqp.DialTLS(conf.AMQP.Server, cfg) + conn, err = amqp.DialTLS(conf.Server, cfg) } if err != nil { @@ -412,7 +407,7 @@ func (rpc *AmqpRPCServer) replyTooManyRequests(msg amqp.Delivery) error { // Start starts the AMQP-RPC server and handles reconnections, this will block // until a fatal error is returned or AmqpRPCServer.Stop() is called and all // remaining messages are processed. -func (rpc *AmqpRPCServer) Start(c cmd.Config) error { +func (rpc *AmqpRPCServer) Start(c *cmd.AMQPConfig) error { tooManyGoroutines := rpcResponse{ Error: wrapError(core.TooManyRPCRequestsError("RPC server has spawned too many Goroutines")), } @@ -530,7 +525,12 @@ type AmqpRPCCLient struct { } // NewAmqpRPCClient constructs an RPC client using AMQP -func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats statsd.Statter) (rpc *AmqpRPCCLient, err error) { +func NewAmqpRPCClient( + clientQueuePrefix string, + amqpConf *cmd.AMQPConfig, + rpcConf *cmd.RPCServerConfig, + stats statsd.Statter, +) (rpc *AmqpRPCCLient, err error) { hostname, err := os.Hostname() if err != nil { return nil, err @@ -543,26 +543,31 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats } clientQueue := fmt.Sprintf("%s.%s.%x", clientQueuePrefix, hostname, randID) - reconnectBase := c.AMQP.ReconnectTimeouts.Base.Duration + reconnectBase := amqpConf.ReconnectTimeouts.Base.Duration if reconnectBase == 0 { reconnectBase = 20 * time.Millisecond } - reconnectMax := c.AMQP.ReconnectTimeouts.Max.Duration + reconnectMax := amqpConf.ReconnectTimeouts.Max.Duration if reconnectMax == 0 { reconnectMax = time.Minute } + timeout := rpcConf.RPCTimeout.Duration + if timeout == 0 { + timeout = 10 * time.Second + } + rpc = &AmqpRPCCLient{ - serverQueue: serverQueue, + serverQueue: rpcConf.Server, clientQueue: clientQueue, connection: newAMQPConnector(clientQueue, reconnectBase, reconnectMax), pending: make(map[string]chan []byte), - timeout: 10 * time.Second, + timeout: timeout, log: blog.GetAuditLogger(), stats: stats, } - err = rpc.connection.connect(c) + err = rpc.connection.connect(amqpConf) if err != nil { return nil, err } @@ -596,7 +601,7 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats } case err = <-rpc.connection.closeChannel(): rpc.log.Info(fmt.Sprintf(" [!] Client reply channel closed : %s", rpc.clientQueue)) - rpc.connection.reconnect(c, rpc.log) + rpc.connection.reconnect(amqpConf, rpc.log) } } }() @@ -604,12 +609,6 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats return rpc, err } -// SetTimeout configures the maximum time DispatchSync will wait for a response -// before returning an error. -func (rpc *AmqpRPCCLient) SetTimeout(ttl time.Duration) { - rpc.timeout = ttl -} - // dispatch sends a body to the destination, and returns the id for the request // that can be used to correlate it with responses, and a response channel that // can be used to monitor for responses, or discarded for one-shot actions. diff --git a/rpc/connection.go b/rpc/connection.go index 585595add..98f05b47a 100644 --- a/rpc/connection.go +++ b/rpc/connection.go @@ -29,12 +29,12 @@ func newAMQPConnector( // channelMaker encapsulates how to create an AMQP channel. type channelMaker interface { - makeChannel(conf cmd.Config) (amqpChannel, error) + makeChannel(conf *cmd.AMQPConfig) (amqpChannel, error) } type defaultChannelMaker struct{} -func (d defaultChannelMaker) makeChannel(conf cmd.Config) (amqpChannel, error) { +func (d defaultChannelMaker) makeChannel(conf *cmd.AMQPConfig) (amqpChannel, error) { return AmqpChannel(conf) } @@ -68,7 +68,7 @@ func (ac *amqpConnector) closeChannel() chan *amqp.Error { // connect attempts to connect to a channel and subscribe to the named queue, // returning error if it fails. This is used at first startup, where we want to // fail fast if we can't connect. -func (ac *amqpConnector) connect(config cmd.Config) error { +func (ac *amqpConnector) connect(config *cmd.AMQPConfig) error { channel, err := ac.chMaker.makeChannel(config) if err != nil { return fmt.Errorf("channel connect failed for %s: %s", ac.queueName, err) @@ -89,7 +89,7 @@ func (ac *amqpConnector) connect(config cmd.Config) error { // reconnect attempts repeatedly to connect and subscribe to the named queue. It // will loop forever until it succeeds. This is used for a running server, where // we don't want to shut down because we lost our AMQP connection. -func (ac *amqpConnector) reconnect(config cmd.Config, log blog.SyslogWriter) { +func (ac *amqpConnector) reconnect(config *cmd.AMQPConfig, log blog.SyslogWriter) { for i := 0; ; i++ { ac.clk.Sleep(core.RetryBackoff(i, ac.retryTimeoutBase, ac.retryTimeoutMax, 2)) log.Info(fmt.Sprintf(" [!] attempting reconnect for %s", ac.queueName)) diff --git a/rpc/connection_test.go b/rpc/connection_test.go index f86b19558..4f8b73072 100644 --- a/rpc/connection_test.go +++ b/rpc/connection_test.go @@ -17,7 +17,7 @@ type mockChannelMaker struct { channel amqpChannel } -func (m mockChannelMaker) makeChannel(conf cmd.Config) (amqpChannel, error) { +func (m mockChannelMaker) makeChannel(conf *cmd.AMQPConfig) (amqpChannel, error) { return m.channel, nil } @@ -44,7 +44,7 @@ func TestConnect(t *testing.T) { mockChannel.EXPECT().QueueBind("fooqueue", "fooqueue", AmqpExchange, false, nil) mockChannel.EXPECT().Consume("fooqueue", consumerName, AmqpAutoAck, AmqpExclusive, AmqpNoLocal, AmqpNoWait, nil).Return(make(<-chan amqp.Delivery), nil) mockChannel.EXPECT().NotifyClose(gomock.Any()).Return(make(chan *amqp.Error)) - err := ac.connect(cmd.Config{}) + err := ac.connect(&cmd.AMQPConfig{}) if err != nil { t.Fatalf("failed to connect: %s", err) } @@ -64,7 +64,7 @@ func TestConnectFail(t *testing.T) { defer finish() mockChannel.EXPECT().QueueDeclare( "fooqueue", AmqpDurable, AmqpDeleteUnused, AmqpExclusive, AmqpNoWait, nil).Return(amqp.Queue{}, errors.New("fail")) - err := ac.connect(cmd.Config{}) + err := ac.connect(&cmd.AMQPConfig{}) if err == nil { t.Fatalf("connect should have errored but did not") } @@ -89,7 +89,7 @@ func TestReconnect(t *testing.T) { log = mocks.UseMockLog() - ac.reconnect(cmd.Config{}, log) + ac.reconnect(&cmd.AMQPConfig{}, log) if ac.channel != mockChannel { t.Errorf("ac.channel was not equal to mockChannel") } diff --git a/rpc/rpc-interfaces.go b/rpc/rpc-interfaces.go index fbb38589f..85c6f369c 100644 --- a/rpc/rpc-interfaces.go +++ b/rpc/rpc-interfaces.go @@ -5,13 +5,8 @@ package rpc -import ( - "time" -) - // Client describes the functions an RPC Client performs type Client interface { - SetTimeout(time.Duration) DispatchSync(string, []byte) ([]byte, error) } diff --git a/rpc/rpc-wrappers.go b/rpc/rpc-wrappers.go index 700ca1681..48146e1ca 100644 --- a/rpc/rpc-wrappers.go +++ b/rpc/rpc-wrappers.go @@ -13,7 +13,9 @@ import ( "net" "time" + "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd" jose "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/letsencrypt/go-jose" + "github.com/letsencrypt/boulder/cmd" "github.com/letsencrypt/boulder/core" blog "github.com/letsencrypt/boulder/log" ) @@ -362,9 +364,9 @@ type RegistrationAuthorityClient struct { } // NewRegistrationAuthorityClient constructs an RPC client -func NewRegistrationAuthorityClient(client Client) (rac RegistrationAuthorityClient, err error) { - rac = RegistrationAuthorityClient{rpc: client} - return +func NewRegistrationAuthorityClient(clientName string, amqpConf *cmd.AMQPConfig, stats statsd.Statter) (*RegistrationAuthorityClient, error) { + client, err := NewAmqpRPCClient(clientName+"->RA", amqpConf, amqpConf.RA, stats) + return &RegistrationAuthorityClient{rpc: client}, err } // NewRegistration sends a New Registration request @@ -575,9 +577,9 @@ type ValidationAuthorityClient struct { } // NewValidationAuthorityClient constructs an RPC client -func NewValidationAuthorityClient(client Client) (vac ValidationAuthorityClient, err error) { - vac = ValidationAuthorityClient{rpc: client} - return +func NewValidationAuthorityClient(clientName string, amqpConf *cmd.AMQPConfig, stats statsd.Statter) (*ValidationAuthorityClient, error) { + client, err := NewAmqpRPCClient(clientName+"->VA", amqpConf, amqpConf.VA, stats) + return &ValidationAuthorityClient{rpc: client}, err } // UpdateValidations sends an Update Validations request @@ -655,9 +657,9 @@ type PublisherClient struct { } // NewPublisherClient constructs an RPC client -func NewPublisherClient(client Client) (pub PublisherClient, err error) { - pub = PublisherClient{rpc: client} - return +func NewPublisherClient(clientName string, amqpConf *cmd.AMQPConfig, stats statsd.Statter) (*PublisherClient, error) { + client, err := NewAmqpRPCClient(clientName+"->Publisher", amqpConf, amqpConf.Publisher, stats) + return &PublisherClient{rpc: client}, err } // SubmitToCT sends a request to submit a certifcate to CT logs @@ -741,9 +743,9 @@ type CertificateAuthorityClient struct { } // NewCertificateAuthorityClient constructs an RPC client -func NewCertificateAuthorityClient(client Client) (cac CertificateAuthorityClient, err error) { - cac = CertificateAuthorityClient{rpc: client} - return +func NewCertificateAuthorityClient(clientName string, amqpConf *cmd.AMQPConfig, stats statsd.Statter) (*CertificateAuthorityClient, error) { + client, err := NewAmqpRPCClient(clientName+"->CA", amqpConf, amqpConf.CA, stats) + return &CertificateAuthorityClient{rpc: client}, err } // IssueCertificate sends a request to issue a certificate @@ -1172,9 +1174,9 @@ type StorageAuthorityClient struct { } // NewStorageAuthorityClient constructs an RPC client -func NewStorageAuthorityClient(client Client) (sac StorageAuthorityClient, err error) { - sac = StorageAuthorityClient{rpc: client} - return +func NewStorageAuthorityClient(clientName string, amqpConf *cmd.AMQPConfig, stats statsd.Statter) (*StorageAuthorityClient, error) { + client, err := NewAmqpRPCClient(clientName+"->SA", amqpConf, amqpConf.SA, stats) + return &StorageAuthorityClient{rpc: client}, err } // GetRegistration sends a request to get a registration by ID diff --git a/rpc/rpc-wrappers_test.go b/rpc/rpc-wrappers_test.go index 0fcb6fc65..9f5860a0b 100644 --- a/rpc/rpc-wrappers_test.go +++ b/rpc/rpc-wrappers_test.go @@ -8,7 +8,6 @@ package rpc import ( "encoding/json" "testing" - "time" jose "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/letsencrypt/go-jose" "github.com/letsencrypt/boulder/core" @@ -31,9 +30,6 @@ type MockRPCClient struct { NextErr error } -func (rpc *MockRPCClient) SetTimeout(ttl time.Duration) { -} - func (rpc *MockRPCClient) Dispatch(method string, body []byte) chan []byte { rpc.LastMethod = method rpc.LastBody = body @@ -63,9 +59,7 @@ func (rpc *MockRPCClient) DispatchSync(method string, body []byte) (response []b func TestRANewRegistration(t *testing.T) { mock := &MockRPCClient{} - client, err := NewRegistrationAuthorityClient(mock) - test.AssertNotError(t, err, "Client construction") - test.AssertNotNil(t, client, "Client construction") + client := RegistrationAuthorityClient{mock} var jwk jose.JsonWebKey json.Unmarshal([]byte(JWK1JSON), &jwk) @@ -75,7 +69,7 @@ func TestRANewRegistration(t *testing.T) { Key: jwk, } - _, err = client.NewRegistration(reg) + _, err := client.NewRegistration(reg) test.AssertNotError(t, err, "Updated Registration") test.Assert(t, len(mock.LastBody) > 0, "Didn't send Registration") test.AssertEquals(t, "NewRegistration", mock.LastMethod) @@ -87,15 +81,13 @@ func TestRANewRegistration(t *testing.T) { func TestGenerateOCSP(t *testing.T) { mock := &MockRPCClient{} - client, err := NewCertificateAuthorityClient(mock) - test.AssertNotError(t, err, "Client construction") - test.AssertNotNil(t, client, "Client construction") + client := CertificateAuthorityClient{mock} req := core.OCSPSigningRequest{ // nope } mock.NextResp = []byte{} - _, err = client.GenerateOCSP(req) + _, err := client.GenerateOCSP(req) test.AssertError(t, err, "Should have failed at signer") } diff --git a/test/boulder-config.json b/test/boulder-config.json index 4c8b6dbde..2ce281f11 100644 --- a/test/boulder-config.json +++ b/test/boulder-config.json @@ -2,37 +2,7 @@ "syslog": { "network": "", "server": "", - "stdoutlevel": -1 - }, - - "amqp": { - "server": "amqp://guest:guest@localhost:5673", - "insecure": true, - "-uncomment_for_AMQPS-tls": { - "cacertfile": "/etc/boulder/rabbitmq-cacert.pem", - "certfile": "/etc/boulder/rabbitmq-cert.pem", - "keyfile": "/etc/boulder/rabbitmq-key.pem" - }, - "RA": { - "client": "RA.client", - "server": "RA.server" - }, - "VA": { - "client": "VA.client", - "server": "VA.server" - }, - "SA": { - "client": "SA.client", - "server": "SA.server" - }, - "CA": { - "client": "CA.client", - "server": "CA.server" - }, - "Publisher": { - "client": "Publisher.client", - "server": "Publisher.server" - } + "stdoutlevel": 7 }, "statsd": { @@ -49,7 +19,19 @@ "issuerCacheDuration": "48h", "shutdownStopTimeout": "10s", "shutdownKillTimeout": "1m", - "debugAddr": "localhost:8000" + "debugAddr": "localhost:8000", + "amqp": { + "server": "amqp://guest:guest@localhost:5673", + "insecure": true, + "RA": { + "server": "RA.server", + "rpcTimeout": "1s" + }, + "SA": { + "server": "SA.server", + "rpcTimeout": "1s" + } + } }, "ca": { @@ -112,7 +94,20 @@ } }, "maxConcurrentRPCServerRequests": 16, - "hsmFaultTimeout": "300s" + "hsmFaultTimeout": "300s", + "amqp": { + "server": "amqp://guest:guest@localhost:5673", + "insecure": true, + "CA": { "server": "CA.server" }, + "SA": { + "server": "SA.server", + "rpcTimeout": "1s" + }, + "Publisher": { + "server": "Publisher.server", + "rpcTimeout": "1s" + } + } }, "pa": { @@ -130,13 +125,36 @@ "rateLimitPoliciesFilename": "test/rate-limit-policies.yml", "maxConcurrentRPCServerRequests": 16, "maxContactsPerRegistration": 100, - "debugAddr": "localhost:8002" + "debugAddr": "localhost:8002", + "amqp": { + "server": "amqp://guest:guest@localhost:5673", + "insecure": true, + "RA": { "server": "RA.server" }, + "VA": { + "server": "VA.server", + "rpcTimeout": "60s" + }, + "SA": { + "server": "SA.server", + "rpcTimeout": "1s" + }, + "CA": { + "server": "CA.server", + "rpcTimeout": "1s" + + } + } }, "sa": { "dbConnect": "mysql+tcp://sa@localhost:3306/boulder_sa_integration", "maxConcurrentRPCServerRequests": 16, - "debugAddr": "localhost:8003" + "debugAddr": "localhost:8003", + "amqp": { + "server": "amqp://guest:guest@localhost:5673", + "insecure": true, + "SA": { "server": "SA.server" } + } }, "va": { @@ -147,7 +165,16 @@ "httpsPort": 5001, "tlsPort": 5001 }, - "maxConcurrentRPCServerRequests": 16 + "maxConcurrentRPCServerRequests": 16, + "amqp": { + "server": "amqp://guest:guest@localhost:5673", + "insecure": true, + "VA": { "server": "VA.server" }, + "RA": { + "server": "RA.server", + "rpcTimeout": "1s" + } + } }, "sql": { @@ -155,7 +182,19 @@ }, "revoker": { - "dbConnect": "mysql+tcp://revoker@localhost:3306/boulder_sa_integration" + "dbConnect": "mysql+tcp://revoker@localhost:3306/boulder_sa_integration", + "amqp": { + "server": "amqp://guest:guest@localhost:5673", + "insecure": true, + "RA": { + "server": "RA.server", + "rpcTimeout": "1s" + }, + "SA": { + "server": "SA.server", + "rpcTimeout": "1s" + } + } }, "ocspResponder": { @@ -182,11 +221,31 @@ "oldestIssuedSCT": "72h", "signFailureBackoffFactor": 1.2, "signFailureBackoffMax": "30m", - "debugAddr": "localhost:8006" + "debugAddr": "localhost:8006", + "amqp": { + "server": "amqp://guest:guest@localhost:5673", + "insecure": true, + "SA": { + "server": "SA.server", + "rpcTimeout": "1s" + }, + "CA": { + "server": "CA.server", + "rpcTimeout": "1s" + }, + "Publisher": { + "server": "Publisher.server", + "rpcTimeout": "1s" + } + } }, "activityMonitor": { - "debugAddr": "localhost:8007" + "debugAddr": "localhost:8007", + "amqp": { + "server": "amqp://guest:guest@localhost:5673", + "insecure": true + } }, "mailer": { @@ -204,7 +263,16 @@ "publisher": { "maxConcurrentRPCServerRequests": 16, - "debugAddr": "localhost:8009" + "debugAddr": "localhost:8009", + "amqp": { + "server": "amqp://guest:guest@localhost:5673", + "insecure": true, + "Publisher": { "server": "Publisher.server" }, + "SA": { + "server": "SA.server", + "rpcTimeout": "1s" + } + } }, "common": { From 5fb7be64b0acd28fdd6c72ab23afbc281cbc15c4 Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Wed, 18 Nov 2015 17:40:45 -0800 Subject: [PATCH 2/3] Make ServiceQueue a separate config param. Also, make clientName strings into constants. --- cmd/activity-monitor/main.go | 3 +-- cmd/admin-revoker/main.go | 3 ++- cmd/boulder-ca/main.go | 5 +++-- cmd/boulder-publisher/main.go | 8 ++++---- cmd/boulder-ra/main.go | 5 +++-- cmd/boulder-sa/main.go | 2 +- cmd/boulder-va/main.go | 5 +++-- cmd/boulder-wfe/main.go | 3 ++- cmd/config.go | 19 +++++++++++-------- cmd/expiration-mailer/main.go | 3 ++- cmd/ocsp-updater/main.go | 3 ++- rpc/amqp-rpc.go | 6 +++--- test/boulder-config.json | 10 +++++----- 13 files changed, 42 insertions(+), 33 deletions(-) diff --git a/cmd/activity-monitor/main.go b/cmd/activity-monitor/main.go index c84c28936..55c8a64ae 100644 --- a/cmd/activity-monitor/main.go +++ b/cmd/activity-monitor/main.go @@ -115,8 +115,7 @@ func main() { app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) { go cmd.DebugServer(c.ActivityMonitor.DebugAddr) - amqpConf := c.ActivityMonitor.AMQP - ch, err := rpc.AmqpChannel(amqpConf) + ch, err := rpc.AmqpChannel(c.ActivityMonitor.AMQP) cmd.FailOnError(err, "Could not connect to AMQP") diff --git a/cmd/admin-revoker/main.go b/cmd/admin-revoker/main.go index d244aee22..0370477b1 100644 --- a/cmd/admin-revoker/main.go +++ b/cmd/admin-revoker/main.go @@ -36,6 +36,8 @@ func loadConfig(c *cli.Context) (config cmd.Config, err error) { return } +const clientName = "AdminRevoker" + func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog.AuditLogger, *gorp.DbMap, rpc.StorageAuthorityClient) { c, err := loadConfig(context) cmd.FailOnError(err, "Failed to load Boulder configuration") @@ -43,7 +45,6 @@ func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog. stats, auditlogger := cmd.StatsAndLogging(c.Statsd, c.Syslog) amqpConf := c.Revoker.AMQP - clientName := "AdminRevoker" rac, err := rpc.NewRegistrationAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create CA client") diff --git a/cmd/boulder-ca/main.go b/cmd/boulder-ca/main.go index 798f14e01..af2f2a28b 100644 --- a/cmd/boulder-ca/main.go +++ b/cmd/boulder-ca/main.go @@ -16,6 +16,8 @@ import ( "github.com/letsencrypt/boulder/sa" ) +const clientName = "CA" + func main() { app := cmd.NewAppShell("boulder-ca", "Handles issuance operations") app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) { @@ -42,14 +44,13 @@ func main() { go cmd.ProfileCmd("CA", stats) amqpConf := c.CA.AMQP - clientName := "CA" cai.SA, err = rpc.NewStorageAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Failed to create SA client") cai.Publisher, err = rpc.NewPublisherClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Failed to create Publisher client") - cas, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.CA, c.CA.MaxConcurrentRPCServerRequests, stats) + cas, err := rpc.NewAmqpRPCServer(amqpConf, c.CA.MaxConcurrentRPCServerRequests, stats) cmd.FailOnError(err, "Unable to create CA RPC server") rpc.NewCertificateAuthorityServer(cas, cai) diff --git a/cmd/boulder-publisher/main.go b/cmd/boulder-publisher/main.go index a0b7bf1b4..37b49176f 100644 --- a/cmd/boulder-publisher/main.go +++ b/cmd/boulder-publisher/main.go @@ -14,6 +14,8 @@ import ( "github.com/letsencrypt/boulder/rpc" ) +const clientName = "Publisher" + func main() { app := cmd.NewAppShell("boulder-publisher", "Submits issued certificates to CT logs") app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) { @@ -23,13 +25,11 @@ func main() { go cmd.DebugServer(c.Publisher.DebugAddr) go cmd.ProfileCmd("Publisher", stats) - pubConf := c.Publisher - amqpConf := pubConf.AMQP - clientName := "Publisher" + amqpConf := c.Publisher.AMQP pubi.SA, err = rpc.NewStorageAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create SA client") - pubs, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.Publisher, pubConf.MaxConcurrentRPCServerRequests, stats) + pubs, err := rpc.NewAmqpRPCServer(amqpConf, c.Publisher.MaxConcurrentRPCServerRequests, stats) cmd.FailOnError(err, "Unable to create Publisher RPC server") rpc.NewPublisherServer(pubs, &pubi) diff --git a/cmd/boulder-ra/main.go b/cmd/boulder-ra/main.go index e46d6e2dc..034ef94dd 100644 --- a/cmd/boulder-ra/main.go +++ b/cmd/boulder-ra/main.go @@ -20,6 +20,8 @@ import ( "github.com/letsencrypt/boulder/rpc" ) +const clientName = "RA" + func main() { app := cmd.NewAppShell("boulder-ra", "Handles service orchestration") app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) { @@ -40,7 +42,6 @@ func main() { go cmd.ProfileCmd("RA", stats) amqpConf := c.RA.AMQP - clientName := "RA" vac, err := rpc.NewValidationAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create VA client") @@ -70,7 +71,7 @@ func main() { rai.CA = cac rai.SA = sac - ras, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.RA, c.RA.MaxConcurrentRPCServerRequests, stats) + ras, err := rpc.NewAmqpRPCServer(amqpConf, c.RA.MaxConcurrentRPCServerRequests, stats) cmd.FailOnError(err, "Unable to create RA RPC server") rpc.NewRegistrationAuthorityServer(ras, rai) diff --git a/cmd/boulder-sa/main.go b/cmd/boulder-sa/main.go index e5f7dd2ad..d14f58c15 100644 --- a/cmd/boulder-sa/main.go +++ b/cmd/boulder-sa/main.go @@ -30,7 +30,7 @@ func main() { go cmd.ProfileCmd("SA", stats) amqpConf := saConf.AMQP - sas, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.SA, c.SA.MaxConcurrentRPCServerRequests, stats) + sas, err := rpc.NewAmqpRPCServer(amqpConf, c.SA.MaxConcurrentRPCServerRequests, stats) cmd.FailOnError(err, "Unable to create SA RPC server") rpc.NewStorageAuthorityServer(sas, sai) diff --git a/cmd/boulder-va/main.go b/cmd/boulder-va/main.go index 12b09c805..d83aaaa92 100644 --- a/cmd/boulder-va/main.go +++ b/cmd/boulder-va/main.go @@ -18,6 +18,8 @@ import ( "github.com/letsencrypt/boulder/va" ) +const clientName = "VA" + func main() { app := cmd.NewAppShell("boulder-va", "Handles challenge validation") app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) { @@ -51,13 +53,12 @@ func main() { vai.UserAgent = c.VA.UserAgent amqpConf := c.VA.AMQP - clientName := "VA" rac, err := rpc.NewRegistrationAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create RA client") vai.RA = rac - vas, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.VA, c.VA.MaxConcurrentRPCServerRequests, stats) + vas, err := rpc.NewAmqpRPCServer(amqpConf, c.VA.MaxConcurrentRPCServerRequests, stats) cmd.FailOnError(err, "Unable to create VA RPC server") rpc.NewValidationAuthorityServer(vas, vai) diff --git a/cmd/boulder-wfe/main.go b/cmd/boulder-wfe/main.go index c5e55c031..9b4c79c1e 100644 --- a/cmd/boulder-wfe/main.go +++ b/cmd/boulder-wfe/main.go @@ -22,9 +22,10 @@ import ( "github.com/letsencrypt/boulder/wfe" ) +const clientName = "WFE" + func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (*rpc.RegistrationAuthorityClient, *rpc.StorageAuthorityClient) { amqpConf := c.WFE.AMQP - clientName := "WFE" rac, err := rpc.NewRegistrationAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create RA client") diff --git a/cmd/config.go b/cmd/config.go index c320cf7f0..bc799d233 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -183,14 +183,17 @@ type ServiceConfig struct { // AMQPConfig describes how to connect to AMQP, and how to speak to each of the // RPC services we offer via AMQP. type AMQPConfig struct { - Server string - Insecure bool - RA *RPCServerConfig - VA *RPCServerConfig - SA *RPCServerConfig - CA *RPCServerConfig - Publisher *RPCServerConfig - TLS *TLSConfig + Server string + Insecure bool + RA *RPCServerConfig + VA *RPCServerConfig + SA *RPCServerConfig + CA *RPCServerConfig + Publisher *RPCServerConfig + TLS *TLSConfig + // Queue name on which to listen, if this is an RPC service (vs acting only as + // an RPC client). + ServiceQueue string ReconnectTimeouts struct { Base ConfigDuration Max ConfigDuration diff --git a/cmd/expiration-mailer/main.go b/cmd/expiration-mailer/main.go index 5e0736d41..b5b238e76 100644 --- a/cmd/expiration-mailer/main.go +++ b/cmd/expiration-mailer/main.go @@ -208,6 +208,8 @@ func (ds durationSlice) Swap(a, b int) { ds[a], ds[b] = ds[b], ds[a] } +const clientName = "ExpirationMailer" + func main() { app := cmd.NewAppShell("expiration-mailer", "Sends certificate expiration emails") @@ -233,7 +235,6 @@ func main() { cmd.FailOnError(err, "Could not connect to database") amqpConf := c.SA.AMQP - clientName := "ExpirationMailer" sac, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Failed to create SA client") diff --git a/cmd/ocsp-updater/main.go b/cmd/ocsp-updater/main.go index ed0d24c94..5cd4085ec 100644 --- a/cmd/ocsp-updater/main.go +++ b/cmd/ocsp-updater/main.go @@ -531,13 +531,14 @@ func (l *looper) loop() error { } } +const clientName = "OCSP" + func setupClients(c cmd.OCSPUpdaterConfig, stats statsd.Statter) ( core.CertificateAuthority, core.Publisher, core.StorageAuthority, ) { amqpConf := c.AMQP - clientName := "OCSP" cac, err := rpc.NewCertificateAuthorityClient(clientName, amqpConf, stats) cmd.FailOnError(err, "Unable to create CA client") diff --git a/rpc/amqp-rpc.go b/rpc/amqp-rpc.go index c2dff2fa9..eb0f134e3 100644 --- a/rpc/amqp-rpc.go +++ b/rpc/amqp-rpc.go @@ -177,7 +177,7 @@ type AmqpRPCServer struct { // NewAmqpRPCServer creates a new RPC server for the given queue and will begin // consuming requests from the queue. To start the server you must call Start(). -func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, rpcConf *cmd.RPCServerConfig, maxConcurrentRPCServerRequests int64, stats statsd.Statter) (*AmqpRPCServer, error) { +func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, maxConcurrentRPCServerRequests int64, stats statsd.Statter) (*AmqpRPCServer, error) { log := blog.GetAuditLogger() reconnectBase := amqpConf.ReconnectTimeouts.Base.Duration @@ -190,8 +190,8 @@ func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, rpcConf *cmd.RPCServerConfig, ma } return &AmqpRPCServer{ - serverQueue: rpcConf.Server, - connection: newAMQPConnector(rpcConf.Server, reconnectBase, reconnectMax), + serverQueue: amqpConf.ServiceQueue, + connection: newAMQPConnector(amqpConf.ServiceQueue, reconnectBase, reconnectMax), log: log, dispatchTable: make(map[string]func([]byte) ([]byte, error)), maxConcurrentRPCServerRequests: maxConcurrentRPCServerRequests, diff --git a/test/boulder-config.json b/test/boulder-config.json index 2ce281f11..2ddda18fc 100644 --- a/test/boulder-config.json +++ b/test/boulder-config.json @@ -98,7 +98,7 @@ "amqp": { "server": "amqp://guest:guest@localhost:5673", "insecure": true, - "CA": { "server": "CA.server" }, + "serviceQueue": "CA.server", "SA": { "server": "SA.server", "rpcTimeout": "1s" @@ -129,7 +129,7 @@ "amqp": { "server": "amqp://guest:guest@localhost:5673", "insecure": true, - "RA": { "server": "RA.server" }, + "serviceQueue": "RA.server", "VA": { "server": "VA.server", "rpcTimeout": "60s" @@ -153,7 +153,7 @@ "amqp": { "server": "amqp://guest:guest@localhost:5673", "insecure": true, - "SA": { "server": "SA.server" } + "serviceQueue": "SA.server" } }, @@ -169,7 +169,7 @@ "amqp": { "server": "amqp://guest:guest@localhost:5673", "insecure": true, - "VA": { "server": "VA.server" }, + "serviceQueue": "VA.server", "RA": { "server": "RA.server", "rpcTimeout": "1s" @@ -267,7 +267,7 @@ "amqp": { "server": "amqp://guest:guest@localhost:5673", "insecure": true, - "Publisher": { "server": "Publisher.server" }, + "serviceQueue": "Publisher.server", "SA": { "server": "SA.server", "rpcTimeout": "1s" From 3463d7255408e04b560e0617f5c7774c56c4eb10 Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Sun, 22 Nov 2015 14:34:23 -0800 Subject: [PATCH 3/3] Replace DialTimeout with ReadTimeout. Generally Dial will be very fast because our resolver is local, so there's no need to override its default of 2s. However, since our resolver recurses more or less every time, getting the answer back is very slow. So we want to be able to set a high ReadTimeout. --- core/dns.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/dns.go b/core/dns.go index 1cd9ba147..5c950135f 100644 --- a/core/dns.go +++ b/core/dns.go @@ -122,11 +122,11 @@ type DNSResolverImpl struct { // NewDNSResolverImpl constructs a new DNS resolver object that utilizes the // provided list of DNS servers for resolution. -func NewDNSResolverImpl(dialTimeout time.Duration, servers []string) *DNSResolverImpl { +func NewDNSResolverImpl(readTimeout time.Duration, servers []string) *DNSResolverImpl { dnsClient := new(dns.Client) // Set timeout for underlying net.Conn - dnsClient.DialTimeout = dialTimeout + dnsClient.ReadTimeout = readTimeout dnsClient.Net = "tcp" return &DNSResolverImpl{ @@ -139,8 +139,8 @@ func NewDNSResolverImpl(dialTimeout time.Duration, servers []string) *DNSResolve // NewTestDNSResolverImpl constructs a new DNS resolver object that utilizes the // provided list of DNS servers for resolution and will allow loopback addresses. // This constructor should *only* be called from tests (unit or integration). -func NewTestDNSResolverImpl(dialTimeout time.Duration, servers []string) *DNSResolverImpl { - resolver := NewDNSResolverImpl(dialTimeout, servers) +func NewTestDNSResolverImpl(readTimeout time.Duration, servers []string) *DNSResolverImpl { + resolver := NewDNSResolverImpl(readTimeout, servers) resolver.allowRestrictedAddresses = true return resolver }