Merge branch 'master' into rpc_problem_details

This commit is contained in:
Roland Bracewell Shoemaker 2015-11-23 11:12:14 -08:00
commit ef54e932b5
20 changed files with 327 additions and 263 deletions

View File

@ -115,7 +115,7 @@ func main() {
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
go cmd.DebugServer(c.ActivityMonitor.DebugAddr)
ch, err := rpc.AmqpChannel(c)
ch, err := rpc.AmqpChannel(c.ActivityMonitor.AMQP)
cmd.FailOnError(err, "Could not connect to AMQP")

View File

@ -36,28 +36,25 @@ func loadConfig(c *cli.Context) (config cmd.Config, err error) {
return
}
const clientName = "AdminRevoker"
func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog.AuditLogger, *gorp.DbMap, rpc.StorageAuthorityClient) {
c, err := loadConfig(context)
cmd.FailOnError(err, "Failed to load Boulder configuration")
stats, auditlogger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
raRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->RA", c.AMQP.RA.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
rac, err := rpc.NewRegistrationAuthorityClient(raRPC)
amqpConf := c.Revoker.AMQP
rac, err := rpc.NewRegistrationAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create CA client")
dbMap, err := sa.NewDbMap(c.Revoker.DBConnect)
cmd.FailOnError(err, "Couldn't setup database connection")
saRPC, err := rpc.NewAmqpRPCClient("AdminRevoker->SA", c.AMQP.SA.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
sac, err := rpc.NewStorageAuthorityClient(saRPC)
sac, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Failed to create SA client")
return rac, auditlogger, dbMap, sac
return *rac, auditlogger, dbMap, *sac
}
func addDeniedNames(tx *gorp.Transaction, names []string) (err error) {

View File

@ -16,6 +16,8 @@ import (
"github.com/letsencrypt/boulder/sa"
)
const clientName = "CA"
func main() {
app := cmd.NewAppShell("boulder-ca", "Handles issuance operations")
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
@ -41,26 +43,18 @@ func main() {
go cmd.ProfileCmd("CA", stats)
saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
sac, err := rpc.NewStorageAuthorityClient(saRPC)
amqpConf := c.CA.AMQP
cai.SA, err = rpc.NewStorageAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Failed to create SA client")
pubRPC, err := rpc.NewAmqpRPCClient("CA->Publisher", c.AMQP.Publisher.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
pubc, err := rpc.NewPublisherClient(pubRPC)
cai.Publisher, err = rpc.NewPublisherClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Failed to create Publisher client")
cai.Publisher = &pubc
cai.SA = &sac
cas, err := rpc.NewAmqpRPCServer(c.AMQP.CA.Server, c.CA.MaxConcurrentRPCServerRequests, c)
cas, err := rpc.NewAmqpRPCServer(amqpConf, c.CA.MaxConcurrentRPCServerRequests, stats)
cmd.FailOnError(err, "Unable to create CA RPC server")
rpc.NewCertificateAuthorityServer(cas, cai)
err = cas.Start(c)
err = cas.Start(amqpConf)
cmd.FailOnError(err, "Unable to run CA RPC server")
}

View File

@ -14,6 +14,8 @@ import (
"github.com/letsencrypt/boulder/rpc"
)
const clientName = "Publisher"
func main() {
app := cmd.NewAppShell("boulder-publisher", "Submits issued certificates to CT logs")
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
@ -23,19 +25,15 @@ func main() {
go cmd.DebugServer(c.Publisher.DebugAddr)
go cmd.ProfileCmd("Publisher", stats)
saRPC, err := rpc.NewAmqpRPCClient("Publisher->SA", c.AMQP.SA.Server, c, stats)
cmd.FailOnError(err, "Unable to create SA RPC client")
sac, err := rpc.NewStorageAuthorityClient(saRPC)
amqpConf := c.Publisher.AMQP
pubi.SA, err = rpc.NewStorageAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create SA client")
pubi.SA = &sac
pubs, err := rpc.NewAmqpRPCServer(c.AMQP.Publisher.Server, c.Publisher.MaxConcurrentRPCServerRequests, c)
pubs, err := rpc.NewAmqpRPCServer(amqpConf, c.Publisher.MaxConcurrentRPCServerRequests, stats)
cmd.FailOnError(err, "Unable to create Publisher RPC server")
rpc.NewPublisherServer(pubs, &pubi)
err = pubs.Start(c)
err = pubs.Start(amqpConf)
cmd.FailOnError(err, "Unable to run Publisher RPC server")
}

View File

@ -20,6 +20,8 @@ import (
"github.com/letsencrypt/boulder/rpc"
)
const clientName = "RA"
func main() {
app := cmd.NewAppShell("boulder-ra", "Handles service orchestration")
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
@ -39,27 +41,19 @@ func main() {
go cmd.ProfileCmd("RA", stats)
vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
caRPC, err := rpc.NewAmqpRPCClient("RA->CA", c.AMQP.CA.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
saRPC, err := rpc.NewAmqpRPCClient("RA->SA", c.AMQP.SA.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
vac, err := rpc.NewValidationAuthorityClient(vaRPC)
amqpConf := c.RA.AMQP
vac, err := rpc.NewValidationAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create VA client")
cac, err := rpc.NewCertificateAuthorityClient(caRPC)
cac, err := rpc.NewCertificateAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create CA client")
sac, err := rpc.NewStorageAuthorityClient(saRPC)
sac, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create SA client")
var dc *ra.DomainCheck
if c.RA.UseIsSafeDomain {
dc = &ra.DomainCheck{VA: &vac}
dc = &ra.DomainCheck{VA: vac}
}
rai := ra.NewRegistrationAuthorityImpl(clock.Default(), auditlogger, stats,
@ -73,15 +67,15 @@ func main() {
rai.DNSResolver = core.NewTestDNSResolverImpl(raDNSTimeout, []string{c.Common.DNSResolver})
}
rai.VA = &vac
rai.CA = &cac
rai.SA = &sac
rai.VA = vac
rai.CA = cac
rai.SA = sac
ras, err := rpc.NewAmqpRPCServer(c.AMQP.RA.Server, c.RA.MaxConcurrentRPCServerRequests, c)
ras, err := rpc.NewAmqpRPCServer(amqpConf, c.RA.MaxConcurrentRPCServerRequests, stats)
cmd.FailOnError(err, "Unable to create RA RPC server")
rpc.NewRegistrationAuthorityServer(ras, rai)
err = ras.Start(c)
err = ras.Start(amqpConf)
cmd.FailOnError(err, "Unable to run RA RPC server")
}

View File

@ -17,9 +17,10 @@ import (
func main() {
app := cmd.NewAppShell("boulder-sa", "Handles SQL operations")
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
go cmd.DebugServer(c.SA.DebugAddr)
saConf := c.SA
go cmd.DebugServer(saConf.DebugAddr)
dbMap, err := sa.NewDbMap(c.SA.DBConnect)
dbMap, err := sa.NewDbMap(saConf.DBConnect)
cmd.FailOnError(err, "Couldn't connect to SA database")
sai, err := sa.NewSQLStorageAuthority(dbMap, clock.Default())
@ -28,11 +29,12 @@ func main() {
go cmd.ProfileCmd("SA", stats)
sas, err := rpc.NewAmqpRPCServer(c.AMQP.SA.Server, c.SA.MaxConcurrentRPCServerRequests, c)
amqpConf := saConf.AMQP
sas, err := rpc.NewAmqpRPCServer(amqpConf, c.SA.MaxConcurrentRPCServerRequests, stats)
cmd.FailOnError(err, "Unable to create SA RPC server")
rpc.NewStorageAuthorityServer(sas, sai)
err = sas.Start(c)
err = sas.Start(amqpConf)
cmd.FailOnError(err, "Unable to run SA RPC server")
}

View File

@ -18,6 +18,8 @@ import (
"github.com/letsencrypt/boulder/va"
)
const clientName = "VA"
func main() {
app := cmd.NewAppShell("boulder-va", "Handles challenge validation")
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
@ -50,19 +52,17 @@ func main() {
}
vai.UserAgent = c.VA.UserAgent
raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
rac, err := rpc.NewRegistrationAuthorityClient(raRPC)
amqpConf := c.VA.AMQP
rac, err := rpc.NewRegistrationAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create RA client")
vai.RA = &rac
vai.RA = rac
vas, err := rpc.NewAmqpRPCServer(c.AMQP.VA.Server, c.VA.MaxConcurrentRPCServerRequests, c)
vas, err := rpc.NewAmqpRPCServer(amqpConf, c.VA.MaxConcurrentRPCServerRequests, stats)
cmd.FailOnError(err, "Unable to create VA RPC server")
rpc.NewValidationAuthorityServer(vas, vai)
err = vas.Start(c)
err = vas.Start(amqpConf)
cmd.FailOnError(err, "Unable to run VA RPC server")
}

View File

@ -22,17 +22,14 @@ import (
"github.com/letsencrypt/boulder/wfe"
)
func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient) {
raRPC, err := rpc.NewAmqpRPCClient("WFE->RA", c.AMQP.RA.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
const clientName = "WFE"
saRPC, err := rpc.NewAmqpRPCClient("WFE->SA", c.AMQP.SA.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
rac, err := rpc.NewRegistrationAuthorityClient(raRPC)
func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (*rpc.RegistrationAuthorityClient, *rpc.StorageAuthorityClient) {
amqpConf := c.WFE.AMQP
rac, err := rpc.NewRegistrationAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create RA client")
sac, err := rpc.NewStorageAuthorityClient(saRPC)
sac, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create SA client")
return rac, sac
@ -59,8 +56,8 @@ func main() {
wfe, err := wfe.NewWebFrontEndImpl(stats, clock.Default())
cmd.FailOnError(err, "Unable to create WFE")
rac, sac := setupWFE(c, auditlogger, stats)
wfe.RA = &rac
wfe.SA = &sac
wfe.RA = rac
wfe.SA = sac
wfe.SubscriberAgreementURL = c.SubscriberAgreementURL
wfe.AllowOrigins = c.WFE.AllowOrigins

View File

@ -24,28 +24,15 @@ import (
// Note: NO DEFAULTS are provided.
type Config struct {
ActivityMonitor struct {
// DebugAddr is the address to run the /debug handlers on.
DebugAddr string
ServiceConfig
}
// General
AMQP struct {
Server string
Insecure bool
RA Queue
VA Queue
SA Queue
CA Queue
OCSP Queue
Publisher Queue
TLS *TLSConfig
ReconnectTimeouts struct {
Base ConfigDuration
Max ConfigDuration
}
}
// Default AMQPConfig for services that don't specify one.
// TODO(jsha): Delete this after a deploy.
AMQP *AMQPConfig
WFE struct {
ServiceConfig
BaseURL string
ListenAddress string
@ -58,19 +45,13 @@ type Config struct {
ShutdownStopTimeout string
ShutdownKillTimeout string
// DebugAddr is the address to run the /debug handlers on.
DebugAddr string
}
CA CAConfig
Monolith struct {
// DebugAddr is the address to run the /debug handlers on.
DebugAddr string
}
RA struct {
ServiceConfig
RateLimitPoliciesFilename string
MaxConcurrentRPCServerRequests int64
@ -79,21 +60,19 @@ type Config struct {
// UseIsSafeDomain determines whether to call VA.IsSafeDomain
UseIsSafeDomain bool // TODO(jmhodges): remove after va IsSafeDomain deploy
// DebugAddr is the address to run the /debug handlers on.
DebugAddr string
}
SA struct {
ServiceConfig
DBConnect string
MaxConcurrentRPCServerRequests int64
// DebugAddr is the address to run the /debug handlers on.
DebugAddr string
}
VA struct {
ServiceConfig
UserAgent string
PortConfig va.PortConfig
@ -101,9 +80,6 @@ type Config struct {
MaxConcurrentRPCServerRequests int64
GoogleSafeBrowsing *GoogleSafeBrowsingConfig
// DebugAddr is the address to run the /debug handlers on.
DebugAddr string
}
SQL struct {
@ -116,9 +92,14 @@ type Config struct {
Revoker struct {
DBConnect string
// The revoker isn't a long running service, so doesn't get a full
// ServiceConfig, just an AMQPConfig.
AMQP *AMQPConfig
}
Mailer struct {
ServiceConfig
Server string
Port string
Username string
@ -134,12 +115,11 @@ type Config struct {
NagCheckInterval string
// Path to a text/template email template
EmailTemplate string
// DebugAddr is the address to run the /debug handlers on.
DebugAddr string
}
OCSPResponder struct {
ServiceConfig
// Source indicates the source of pre-signed OCSP responses to be used. It
// can be a DBConnect string or a file URL. The file URL style is used
// when responding from a static file for intermediates and roots.
@ -153,18 +133,13 @@ type Config struct {
ShutdownStopTimeout string
ShutdownKillTimeout string
// DebugAddr is the address to run the /debug handlers on.
DebugAddr string
}
OCSPUpdater OCSPUpdaterConfig
Publisher struct {
ServiceConfig
MaxConcurrentRPCServerRequests int64
// DebugAddr is the address to run the /debug handlers on.
DebugAddr string
}
ExternalCertImporter struct {
@ -197,10 +172,40 @@ type Config struct {
SubscriberAgreementURL string
}
// ServiceConfig contains config items that are common to all our services, to
// be embedded in other config structs.
type ServiceConfig struct {
// DebugAddr is the address to run the /debug handlers on.
DebugAddr string
AMQP *AMQPConfig
}
// AMQPConfig describes how to connect to AMQP, and how to speak to each of the
// RPC services we offer via AMQP.
type AMQPConfig struct {
Server string
Insecure bool
RA *RPCServerConfig
VA *RPCServerConfig
SA *RPCServerConfig
CA *RPCServerConfig
Publisher *RPCServerConfig
TLS *TLSConfig
// Queue name on which to listen, if this is an RPC service (vs acting only as
// an RPC client).
ServiceQueue string
ReconnectTimeouts struct {
Base ConfigDuration
Max ConfigDuration
}
}
// CAConfig structs have configuration information for the certificate
// authority, including database parameters as well as controls for
// issued certificates.
type CAConfig struct {
ServiceConfig
Profile string
TestMode bool
DBConnect string
@ -219,9 +224,6 @@ type CAConfig struct {
MaxConcurrentRPCServerRequests int64
HSMFaultTimeout ConfigDuration
// DebugAddr is the address to run the /debug handlers on.
DebugAddr string
}
// PAConfig specifies how a policy authority should connect to its
@ -280,14 +282,17 @@ type TLSConfig struct {
CACertFile *string
}
// Queue describes a queue name
type Queue struct {
Server string
// RPCServerConfig contains configuration particular to a specific RPC server
// type (e.g. RA, SA, etc)
type RPCServerConfig struct {
Server string // Queue name where the server receives requests
RPCTimeout ConfigDuration
}
// OCSPUpdaterConfig provides the various window tick times and batch sizes needed
// for the OCSP (and SCT) updater
type OCSPUpdaterConfig struct {
ServiceConfig
DBConnect string
NewCertificateWindow ConfigDuration
@ -312,9 +317,6 @@ type OCSPUpdaterConfig struct {
SignFailureBackoffFactor float64
SignFailureBackoffMax ConfigDuration
// DebugAddr is the address to run the /debug handlers on.
DebugAddr string
}
// GoogleSafeBrowsingConfig is the JSON config struct for the VA's use of the

View File

@ -208,6 +208,8 @@ func (ds durationSlice) Swap(a, b int) {
ds[a], ds[b] = ds[b], ds[a]
}
const clientName = "ExpirationMailer"
func main() {
app := cmd.NewAppShell("expiration-mailer", "Sends certificate expiration emails")
@ -232,10 +234,8 @@ func main() {
dbMap, err := sa.NewDbMap(c.Mailer.DBConnect)
cmd.FailOnError(err, "Could not connect to database")
saRPC, err := rpc.NewAmqpRPCClient("ExpirationMailer->SA", c.AMQP.SA.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
sac, err := rpc.NewStorageAuthorityClient(saRPC)
amqpConf := c.SA.AMQP
sac, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Failed to create SA client")
// Load email template

View File

@ -531,28 +531,22 @@ func (l *looper) loop() error {
}
}
func setupClients(c cmd.Config, stats statsd.Statter) (
const clientName = "OCSP"
func setupClients(c cmd.OCSPUpdaterConfig, stats statsd.Statter) (
core.CertificateAuthority,
core.Publisher,
core.StorageAuthority,
) {
caRPC, err := rpc.NewAmqpRPCClient("OCSP->CA", c.AMQP.CA.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
cac, err := rpc.NewCertificateAuthorityClient(caRPC)
amqpConf := c.AMQP
cac, err := rpc.NewCertificateAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create CA client")
pubRPC, err := rpc.NewAmqpRPCClient("OCSP->Publisher", c.AMQP.Publisher.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
pubc, err := rpc.NewPublisherClient(pubRPC)
pubc, err := rpc.NewPublisherClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create Publisher client")
saRPC, err := rpc.NewAmqpRPCClient("OCSP->SA", c.AMQP.SA.Server, c, stats)
cmd.FailOnError(err, "Unable to create RPC client")
sac, err := rpc.NewStorageAuthorityClient(saRPC)
cmd.FailOnError(err, "Unable to create Publisher client")
sac, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create SA client")
return cac, pubc, sac
}
@ -561,14 +555,15 @@ func main() {
app := cmd.NewAppShell("ocsp-updater", "Generates and updates OCSP responses")
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
go cmd.DebugServer(c.OCSPUpdater.DebugAddr)
conf := c.OCSPUpdater
go cmd.DebugServer(conf.DebugAddr)
go cmd.ProfileCmd("OCSP-Updater", stats)
// Configure DB
dbMap, err := sa.NewDbMap(c.OCSPUpdater.DBConnect)
dbMap, err := sa.NewDbMap(conf.DBConnect)
cmd.FailOnError(err, "Could not connect to database")
cac, pubc, sac := setupClients(c, stats)
cac, pubc, sac := setupClients(conf, stats)
updater, err := newUpdater(
stats,
@ -578,7 +573,7 @@ func main() {
pubc,
sac,
// Necessary evil for now
c.OCSPUpdater,
conf,
len(c.Common.CT.Logs),
c.Common.IssuerCert,
)

View File

@ -95,6 +95,35 @@ func (as *AppShell) Run() {
config = as.Config(c, config)
}
// Provide default values for each service's AMQP config section.
if config.ActivityMonitor.AMQP == nil {
config.ActivityMonitor.AMQP = config.AMQP
}
if config.WFE.AMQP == nil {
config.WFE.AMQP = config.AMQP
}
if config.CA.AMQP == nil {
config.CA.AMQP = config.AMQP
}
if config.RA.AMQP == nil {
config.RA.AMQP = config.AMQP
}
if config.SA.AMQP == nil {
config.SA.AMQP = config.AMQP
}
if config.VA.AMQP == nil {
config.VA.AMQP = config.AMQP
}
if config.Mailer.AMQP == nil {
config.Mailer.AMQP = config.AMQP
}
if config.OCSPResponder.AMQP == nil {
config.OCSPResponder.AMQP = config.AMQP
}
if config.Publisher.AMQP == nil {
config.Publisher.AMQP = config.AMQP
}
stats, auditlogger := StatsAndLogging(config.Statsd, config.Syslog)
auditlogger.Info(as.VersionString())

View File

@ -122,11 +122,11 @@ type DNSResolverImpl struct {
// NewDNSResolverImpl constructs a new DNS resolver object that utilizes the
// provided list of DNS servers for resolution.
func NewDNSResolverImpl(dialTimeout time.Duration, servers []string) *DNSResolverImpl {
func NewDNSResolverImpl(readTimeout time.Duration, servers []string) *DNSResolverImpl {
dnsClient := new(dns.Client)
// Set timeout for underlying net.Conn
dnsClient.DialTimeout = dialTimeout
dnsClient.ReadTimeout = readTimeout
dnsClient.Net = "tcp"
return &DNSResolverImpl{
@ -139,8 +139,8 @@ func NewDNSResolverImpl(dialTimeout time.Duration, servers []string) *DNSResolve
// NewTestDNSResolverImpl constructs a new DNS resolver object that utilizes the
// provided list of DNS servers for resolution and will allow loopback addresses.
// This constructor should *only* be called from tests (unit or integration).
func NewTestDNSResolverImpl(dialTimeout time.Duration, servers []string) *DNSResolverImpl {
resolver := NewDNSResolverImpl(dialTimeout, servers)
func NewTestDNSResolverImpl(readTimeout time.Duration, servers []string) *DNSResolverImpl {
resolver := NewDNSResolverImpl(readTimeout, servers)
resolver.allowRestrictedAddresses = true
return resolver
}

View File

@ -178,26 +178,21 @@ type AmqpRPCServer struct {
// NewAmqpRPCServer creates a new RPC server for the given queue and will begin
// consuming requests from the queue. To start the server you must call Start().
func NewAmqpRPCServer(serverQueue string, maxConcurrentRPCServerRequests int64, c cmd.Config) (*AmqpRPCServer, error) {
func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, maxConcurrentRPCServerRequests int64, stats statsd.Statter) (*AmqpRPCServer, error) {
log := blog.GetAuditLogger()
reconnectBase := c.AMQP.ReconnectTimeouts.Base.Duration
reconnectBase := amqpConf.ReconnectTimeouts.Base.Duration
if reconnectBase == 0 {
reconnectBase = 20 * time.Millisecond
}
reconnectMax := c.AMQP.ReconnectTimeouts.Max.Duration
reconnectMax := amqpConf.ReconnectTimeouts.Max.Duration
if reconnectMax == 0 {
reconnectMax = time.Minute
}
stats, err := statsd.NewClient(c.Statsd.Server, c.Statsd.Prefix)
if err != nil {
return nil, err
}
return &AmqpRPCServer{
serverQueue: serverQueue,
connection: newAMQPConnector(serverQueue, reconnectBase, reconnectMax),
serverQueue: amqpConf.ServiceQueue,
connection: newAMQPConnector(amqpConf.ServiceQueue, reconnectBase, reconnectMax),
log: log,
dispatchTable: make(map[string]func([]byte) ([]byte, error)),
maxConcurrentRPCServerRequests: maxConcurrentRPCServerRequests,
@ -328,25 +323,25 @@ func (r rpcResponse) debugString() string {
}
// AmqpChannel sets a AMQP connection up using SSL if configuration is provided
func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) {
func AmqpChannel(conf *cmd.AMQPConfig) (*amqp.Channel, error) {
var conn *amqp.Connection
var err error
log := blog.GetAuditLogger()
if conf.AMQP.Insecure == true {
if conf.Insecure == true {
// If the Insecure flag is true, then just go ahead and connect
conn, err = amqp.Dial(conf.AMQP.Server)
conn, err = amqp.Dial(conf.Server)
} else {
// The insecure flag is false or not set, so we need to load up the options
log.Info("AMQPS: Loading TLS Options.")
if strings.HasPrefix(conf.AMQP.Server, "amqps") == false {
if strings.HasPrefix(conf.Server, "amqps") == false {
err = fmt.Errorf("AMQPS: Not using an AMQPS URL. To use AMQP instead of AMQPS, set insecure=true")
return nil, err
}
if conf.AMQP.TLS == nil {
if conf.TLS == nil {
err = fmt.Errorf("AMQPS: No TLS configuration provided. To use AMQP instead of AMQPS, set insecure=true")
return nil, err
}
@ -354,14 +349,14 @@ func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) {
cfg := new(tls.Config)
// If the configuration specified a certificate (or key), load them
if conf.AMQP.TLS.CertFile != nil || conf.AMQP.TLS.KeyFile != nil {
if conf.TLS.CertFile != nil || conf.TLS.KeyFile != nil {
// But they have to give both.
if conf.AMQP.TLS.CertFile == nil || conf.AMQP.TLS.KeyFile == nil {
if conf.TLS.CertFile == nil || conf.TLS.KeyFile == nil {
err = fmt.Errorf("AMQPS: You must set both of the configuration values AMQP.TLS.KeyFile and AMQP.TLS.CertFile")
return nil, err
}
cert, err := tls.LoadX509KeyPair(*conf.AMQP.TLS.CertFile, *conf.AMQP.TLS.KeyFile)
cert, err := tls.LoadX509KeyPair(*conf.TLS.CertFile, *conf.TLS.KeyFile)
if err != nil {
err = fmt.Errorf("AMQPS: Could not load Client Certificate or Key: %s", err)
return nil, err
@ -373,10 +368,10 @@ func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) {
// If the configuration specified a CA certificate, make it the only
// available root.
if conf.AMQP.TLS.CACertFile != nil {
if conf.TLS.CACertFile != nil {
cfg.RootCAs = x509.NewCertPool()
ca, err := ioutil.ReadFile(*conf.AMQP.TLS.CACertFile)
ca, err := ioutil.ReadFile(*conf.TLS.CACertFile)
if err != nil {
err = fmt.Errorf("AMQPS: Could not load CA Certificate: %s", err)
return nil, err
@ -385,7 +380,7 @@ func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) {
log.Info("AMQPS: Configured CA certificate for AMQPS.")
}
conn, err = amqp.DialTLS(conf.AMQP.Server, cfg)
conn, err = amqp.DialTLS(conf.Server, cfg)
}
if err != nil {
@ -442,7 +437,7 @@ func (rpc *AmqpRPCServer) replyTooManyRequests(msg amqp.Delivery) error {
// Start starts the AMQP-RPC server and handles reconnections, this will block
// until a fatal error is returned or AmqpRPCServer.Stop() is called and all
// remaining messages are processed.
func (rpc *AmqpRPCServer) Start(c cmd.Config) error {
func (rpc *AmqpRPCServer) Start(c *cmd.AMQPConfig) error {
tooManyGoroutines := rpcResponse{
Error: wrapError(core.TooManyRPCRequestsError("RPC server has spawned too many Goroutines")),
}
@ -560,7 +555,12 @@ type AmqpRPCCLient struct {
}
// NewAmqpRPCClient constructs an RPC client using AMQP
func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats statsd.Statter) (rpc *AmqpRPCCLient, err error) {
func NewAmqpRPCClient(
clientQueuePrefix string,
amqpConf *cmd.AMQPConfig,
rpcConf *cmd.RPCServerConfig,
stats statsd.Statter,
) (rpc *AmqpRPCCLient, err error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
@ -573,26 +573,31 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats
}
clientQueue := fmt.Sprintf("%s.%s.%x", clientQueuePrefix, hostname, randID)
reconnectBase := c.AMQP.ReconnectTimeouts.Base.Duration
reconnectBase := amqpConf.ReconnectTimeouts.Base.Duration
if reconnectBase == 0 {
reconnectBase = 20 * time.Millisecond
}
reconnectMax := c.AMQP.ReconnectTimeouts.Max.Duration
reconnectMax := amqpConf.ReconnectTimeouts.Max.Duration
if reconnectMax == 0 {
reconnectMax = time.Minute
}
timeout := rpcConf.RPCTimeout.Duration
if timeout == 0 {
timeout = 10 * time.Second
}
rpc = &AmqpRPCCLient{
serverQueue: serverQueue,
serverQueue: rpcConf.Server,
clientQueue: clientQueue,
connection: newAMQPConnector(clientQueue, reconnectBase, reconnectMax),
pending: make(map[string]chan []byte),
timeout: 10 * time.Second,
timeout: timeout,
log: blog.GetAuditLogger(),
stats: stats,
}
err = rpc.connection.connect(c)
err = rpc.connection.connect(amqpConf)
if err != nil {
return nil, err
}
@ -625,7 +630,7 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats
}
case err = <-rpc.connection.closeChannel():
rpc.log.Info(fmt.Sprintf(" [!] Client reply channel closed : %s", rpc.clientQueue))
rpc.connection.reconnect(c, rpc.log)
rpc.connection.reconnect(amqpConf, rpc.log)
}
}
}()
@ -633,12 +638,6 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats
return rpc, err
}
// SetTimeout configures the maximum time DispatchSync will wait for a response
// before returning an error.
func (rpc *AmqpRPCCLient) SetTimeout(ttl time.Duration) {
rpc.timeout = ttl
}
// dispatch sends a body to the destination, and returns the id for the request
// that can be used to correlate it with responses, and a response channel that
// can be used to monitor for responses, or discarded for one-shot actions.

View File

@ -29,12 +29,12 @@ func newAMQPConnector(
// channelMaker encapsulates how to create an AMQP channel.
type channelMaker interface {
makeChannel(conf cmd.Config) (amqpChannel, error)
makeChannel(conf *cmd.AMQPConfig) (amqpChannel, error)
}
type defaultChannelMaker struct{}
func (d defaultChannelMaker) makeChannel(conf cmd.Config) (amqpChannel, error) {
func (d defaultChannelMaker) makeChannel(conf *cmd.AMQPConfig) (amqpChannel, error) {
return AmqpChannel(conf)
}
@ -68,7 +68,7 @@ func (ac *amqpConnector) closeChannel() chan *amqp.Error {
// connect attempts to connect to a channel and subscribe to the named queue,
// returning error if it fails. This is used at first startup, where we want to
// fail fast if we can't connect.
func (ac *amqpConnector) connect(config cmd.Config) error {
func (ac *amqpConnector) connect(config *cmd.AMQPConfig) error {
channel, err := ac.chMaker.makeChannel(config)
if err != nil {
return fmt.Errorf("channel connect failed for %s: %s", ac.queueName, err)
@ -89,7 +89,7 @@ func (ac *amqpConnector) connect(config cmd.Config) error {
// reconnect attempts repeatedly to connect and subscribe to the named queue. It
// will loop forever until it succeeds. This is used for a running server, where
// we don't want to shut down because we lost our AMQP connection.
func (ac *amqpConnector) reconnect(config cmd.Config, log blog.SyslogWriter) {
func (ac *amqpConnector) reconnect(config *cmd.AMQPConfig, log blog.SyslogWriter) {
for i := 0; ; i++ {
ac.clk.Sleep(core.RetryBackoff(i, ac.retryTimeoutBase, ac.retryTimeoutMax, 2))
log.Info(fmt.Sprintf(" [!] attempting reconnect for %s", ac.queueName))

View File

@ -17,7 +17,7 @@ type mockChannelMaker struct {
channel amqpChannel
}
func (m mockChannelMaker) makeChannel(conf cmd.Config) (amqpChannel, error) {
func (m mockChannelMaker) makeChannel(conf *cmd.AMQPConfig) (amqpChannel, error) {
return m.channel, nil
}
@ -44,7 +44,7 @@ func TestConnect(t *testing.T) {
mockChannel.EXPECT().QueueBind("fooqueue", "fooqueue", AmqpExchange, false, nil)
mockChannel.EXPECT().Consume("fooqueue", consumerName, AmqpAutoAck, AmqpExclusive, AmqpNoLocal, AmqpNoWait, nil).Return(make(<-chan amqp.Delivery), nil)
mockChannel.EXPECT().NotifyClose(gomock.Any()).Return(make(chan *amqp.Error))
err := ac.connect(cmd.Config{})
err := ac.connect(&cmd.AMQPConfig{})
if err != nil {
t.Fatalf("failed to connect: %s", err)
}
@ -64,7 +64,7 @@ func TestConnectFail(t *testing.T) {
defer finish()
mockChannel.EXPECT().QueueDeclare(
"fooqueue", AmqpDurable, AmqpDeleteUnused, AmqpExclusive, AmqpNoWait, nil).Return(amqp.Queue{}, errors.New("fail"))
err := ac.connect(cmd.Config{})
err := ac.connect(&cmd.AMQPConfig{})
if err == nil {
t.Fatalf("connect should have errored but did not")
}
@ -89,7 +89,7 @@ func TestReconnect(t *testing.T) {
log = mocks.UseMockLog()
ac.reconnect(cmd.Config{}, log)
ac.reconnect(&cmd.AMQPConfig{}, log)
if ac.channel != mockChannel {
t.Errorf("ac.channel was not equal to mockChannel")
}

View File

@ -5,13 +5,8 @@
package rpc
import (
"time"
)
// Client describes the functions an RPC Client performs
type Client interface {
SetTimeout(time.Duration)
DispatchSync(string, []byte) ([]byte, error)
}

View File

@ -13,7 +13,9 @@ import (
"net"
"time"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd"
jose "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/letsencrypt/go-jose"
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/core"
blog "github.com/letsencrypt/boulder/log"
)
@ -362,9 +364,9 @@ type RegistrationAuthorityClient struct {
}
// NewRegistrationAuthorityClient constructs an RPC client
func NewRegistrationAuthorityClient(client Client) (rac RegistrationAuthorityClient, err error) {
rac = RegistrationAuthorityClient{rpc: client}
return
func NewRegistrationAuthorityClient(clientName string, amqpConf *cmd.AMQPConfig, stats statsd.Statter) (*RegistrationAuthorityClient, error) {
client, err := NewAmqpRPCClient(clientName+"->RA", amqpConf, amqpConf.RA, stats)
return &RegistrationAuthorityClient{rpc: client}, err
}
// NewRegistration sends a New Registration request
@ -575,9 +577,9 @@ type ValidationAuthorityClient struct {
}
// NewValidationAuthorityClient constructs an RPC client
func NewValidationAuthorityClient(client Client) (vac ValidationAuthorityClient, err error) {
vac = ValidationAuthorityClient{rpc: client}
return
func NewValidationAuthorityClient(clientName string, amqpConf *cmd.AMQPConfig, stats statsd.Statter) (*ValidationAuthorityClient, error) {
client, err := NewAmqpRPCClient(clientName+"->VA", amqpConf, amqpConf.VA, stats)
return &ValidationAuthorityClient{rpc: client}, err
}
// UpdateValidations sends an Update Validations request
@ -655,9 +657,9 @@ type PublisherClient struct {
}
// NewPublisherClient constructs an RPC client
func NewPublisherClient(client Client) (pub PublisherClient, err error) {
pub = PublisherClient{rpc: client}
return
func NewPublisherClient(clientName string, amqpConf *cmd.AMQPConfig, stats statsd.Statter) (*PublisherClient, error) {
client, err := NewAmqpRPCClient(clientName+"->Publisher", amqpConf, amqpConf.Publisher, stats)
return &PublisherClient{rpc: client}, err
}
// SubmitToCT sends a request to submit a certifcate to CT logs
@ -741,9 +743,9 @@ type CertificateAuthorityClient struct {
}
// NewCertificateAuthorityClient constructs an RPC client
func NewCertificateAuthorityClient(client Client) (cac CertificateAuthorityClient, err error) {
cac = CertificateAuthorityClient{rpc: client}
return
func NewCertificateAuthorityClient(clientName string, amqpConf *cmd.AMQPConfig, stats statsd.Statter) (*CertificateAuthorityClient, error) {
client, err := NewAmqpRPCClient(clientName+"->CA", amqpConf, amqpConf.CA, stats)
return &CertificateAuthorityClient{rpc: client}, err
}
// IssueCertificate sends a request to issue a certificate
@ -1172,9 +1174,9 @@ type StorageAuthorityClient struct {
}
// NewStorageAuthorityClient constructs an RPC client
func NewStorageAuthorityClient(client Client) (sac StorageAuthorityClient, err error) {
sac = StorageAuthorityClient{rpc: client}
return
func NewStorageAuthorityClient(clientName string, amqpConf *cmd.AMQPConfig, stats statsd.Statter) (*StorageAuthorityClient, error) {
client, err := NewAmqpRPCClient(clientName+"->SA", amqpConf, amqpConf.SA, stats)
return &StorageAuthorityClient{rpc: client}, err
}
// GetRegistration sends a request to get a registration by ID

View File

@ -8,7 +8,6 @@ package rpc
import (
"encoding/json"
"testing"
"time"
jose "github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/letsencrypt/go-jose"
"github.com/letsencrypt/boulder/core"
@ -31,9 +30,6 @@ type MockRPCClient struct {
NextErr error
}
func (rpc *MockRPCClient) SetTimeout(ttl time.Duration) {
}
func (rpc *MockRPCClient) Dispatch(method string, body []byte) chan []byte {
rpc.LastMethod = method
rpc.LastBody = body
@ -63,9 +59,7 @@ func (rpc *MockRPCClient) DispatchSync(method string, body []byte) (response []b
func TestRANewRegistration(t *testing.T) {
mock := &MockRPCClient{}
client, err := NewRegistrationAuthorityClient(mock)
test.AssertNotError(t, err, "Client construction")
test.AssertNotNil(t, client, "Client construction")
client := RegistrationAuthorityClient{mock}
var jwk jose.JsonWebKey
json.Unmarshal([]byte(JWK1JSON), &jwk)
@ -75,7 +69,7 @@ func TestRANewRegistration(t *testing.T) {
Key: jwk,
}
_, err = client.NewRegistration(reg)
_, err := client.NewRegistration(reg)
test.AssertNotError(t, err, "Updated Registration")
test.Assert(t, len(mock.LastBody) > 0, "Didn't send Registration")
test.AssertEquals(t, "NewRegistration", mock.LastMethod)
@ -87,15 +81,13 @@ func TestRANewRegistration(t *testing.T) {
func TestGenerateOCSP(t *testing.T) {
mock := &MockRPCClient{}
client, err := NewCertificateAuthorityClient(mock)
test.AssertNotError(t, err, "Client construction")
test.AssertNotNil(t, client, "Client construction")
client := CertificateAuthorityClient{mock}
req := core.OCSPSigningRequest{
// nope
}
mock.NextResp = []byte{}
_, err = client.GenerateOCSP(req)
_, err := client.GenerateOCSP(req)
test.AssertError(t, err, "Should have failed at signer")
}

View File

@ -5,36 +5,6 @@
"stdoutlevel": 7
},
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"-uncomment_for_AMQPS-tls": {
"cacertfile": "/etc/boulder/rabbitmq-cacert.pem",
"certfile": "/etc/boulder/rabbitmq-cert.pem",
"keyfile": "/etc/boulder/rabbitmq-key.pem"
},
"RA": {
"client": "RA.client",
"server": "RA.server"
},
"VA": {
"client": "VA.client",
"server": "VA.server"
},
"SA": {
"client": "SA.client",
"server": "SA.server"
},
"CA": {
"client": "CA.client",
"server": "CA.server"
},
"Publisher": {
"client": "Publisher.client",
"server": "Publisher.server"
}
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
@ -49,7 +19,19 @@
"issuerCacheDuration": "48h",
"shutdownStopTimeout": "10s",
"shutdownKillTimeout": "1m",
"debugAddr": "localhost:8000"
"debugAddr": "localhost:8000",
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"RA": {
"server": "RA.server",
"rpcTimeout": "1s"
},
"SA": {
"server": "SA.server",
"rpcTimeout": "1s"
}
}
},
"ca": {
@ -112,7 +94,20 @@
}
},
"maxConcurrentRPCServerRequests": 16,
"hsmFaultTimeout": "300s"
"hsmFaultTimeout": "300s",
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"serviceQueue": "CA.server",
"SA": {
"server": "SA.server",
"rpcTimeout": "1s"
},
"Publisher": {
"server": "Publisher.server",
"rpcTimeout": "1s"
}
}
},
"pa": {
@ -130,13 +125,36 @@
"rateLimitPoliciesFilename": "test/rate-limit-policies.yml",
"maxConcurrentRPCServerRequests": 16,
"maxContactsPerRegistration": 100,
"debugAddr": "localhost:8002"
"debugAddr": "localhost:8002",
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"serviceQueue": "RA.server",
"VA": {
"server": "VA.server",
"rpcTimeout": "60s"
},
"SA": {
"server": "SA.server",
"rpcTimeout": "1s"
},
"CA": {
"server": "CA.server",
"rpcTimeout": "1s"
}
}
},
"sa": {
"dbConnect": "mysql+tcp://sa@localhost:3306/boulder_sa_integration",
"maxConcurrentRPCServerRequests": 16,
"debugAddr": "localhost:8003"
"debugAddr": "localhost:8003",
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"serviceQueue": "SA.server"
}
},
"va": {
@ -147,7 +165,16 @@
"httpsPort": 5001,
"tlsPort": 5001
},
"maxConcurrentRPCServerRequests": 16
"maxConcurrentRPCServerRequests": 16,
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"serviceQueue": "VA.server",
"RA": {
"server": "RA.server",
"rpcTimeout": "1s"
}
}
},
"sql": {
@ -155,7 +182,19 @@
},
"revoker": {
"dbConnect": "mysql+tcp://revoker@localhost:3306/boulder_sa_integration"
"dbConnect": "mysql+tcp://revoker@localhost:3306/boulder_sa_integration",
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"RA": {
"server": "RA.server",
"rpcTimeout": "1s"
},
"SA": {
"server": "SA.server",
"rpcTimeout": "1s"
}
}
},
"ocspResponder": {
@ -182,11 +221,31 @@
"oldestIssuedSCT": "72h",
"signFailureBackoffFactor": 1.2,
"signFailureBackoffMax": "30m",
"debugAddr": "localhost:8006"
"debugAddr": "localhost:8006",
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"SA": {
"server": "SA.server",
"rpcTimeout": "1s"
},
"CA": {
"server": "CA.server",
"rpcTimeout": "1s"
},
"Publisher": {
"server": "Publisher.server",
"rpcTimeout": "1s"
}
}
},
"activityMonitor": {
"debugAddr": "localhost:8007"
"debugAddr": "localhost:8007",
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true
}
},
"mailer": {
@ -204,7 +263,16 @@
"publisher": {
"maxConcurrentRPCServerRequests": 16,
"debugAddr": "localhost:8009"
"debugAddr": "localhost:8009",
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"serviceQueue": "Publisher.server",
"SA": {
"server": "SA.server",
"rpcTimeout": "1s"
}
}
},
"common": {