Make ServiceQueue a separate config param.

Also, make clientName strings into constants.
This commit is contained in:
Jacob Hoffman-Andrews 2015-11-18 17:40:45 -08:00
parent 443af63762
commit 5fb7be64b0
13 changed files with 42 additions and 33 deletions

View File

@ -115,8 +115,7 @@ func main() {
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
go cmd.DebugServer(c.ActivityMonitor.DebugAddr)
amqpConf := c.ActivityMonitor.AMQP
ch, err := rpc.AmqpChannel(amqpConf)
ch, err := rpc.AmqpChannel(c.ActivityMonitor.AMQP)
cmd.FailOnError(err, "Could not connect to AMQP")

View File

@ -36,6 +36,8 @@ func loadConfig(c *cli.Context) (config cmd.Config, err error) {
return
}
const clientName = "AdminRevoker"
func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog.AuditLogger, *gorp.DbMap, rpc.StorageAuthorityClient) {
c, err := loadConfig(context)
cmd.FailOnError(err, "Failed to load Boulder configuration")
@ -43,7 +45,6 @@ func setupContext(context *cli.Context) (rpc.RegistrationAuthorityClient, *blog.
stats, auditlogger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
amqpConf := c.Revoker.AMQP
clientName := "AdminRevoker"
rac, err := rpc.NewRegistrationAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create CA client")

View File

@ -16,6 +16,8 @@ import (
"github.com/letsencrypt/boulder/sa"
)
const clientName = "CA"
func main() {
app := cmd.NewAppShell("boulder-ca", "Handles issuance operations")
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
@ -42,14 +44,13 @@ func main() {
go cmd.ProfileCmd("CA", stats)
amqpConf := c.CA.AMQP
clientName := "CA"
cai.SA, err = rpc.NewStorageAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Failed to create SA client")
cai.Publisher, err = rpc.NewPublisherClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Failed to create Publisher client")
cas, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.CA, c.CA.MaxConcurrentRPCServerRequests, stats)
cas, err := rpc.NewAmqpRPCServer(amqpConf, c.CA.MaxConcurrentRPCServerRequests, stats)
cmd.FailOnError(err, "Unable to create CA RPC server")
rpc.NewCertificateAuthorityServer(cas, cai)

View File

@ -14,6 +14,8 @@ import (
"github.com/letsencrypt/boulder/rpc"
)
const clientName = "Publisher"
func main() {
app := cmd.NewAppShell("boulder-publisher", "Submits issued certificates to CT logs")
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
@ -23,13 +25,11 @@ func main() {
go cmd.DebugServer(c.Publisher.DebugAddr)
go cmd.ProfileCmd("Publisher", stats)
pubConf := c.Publisher
amqpConf := pubConf.AMQP
clientName := "Publisher"
amqpConf := c.Publisher.AMQP
pubi.SA, err = rpc.NewStorageAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create SA client")
pubs, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.Publisher, pubConf.MaxConcurrentRPCServerRequests, stats)
pubs, err := rpc.NewAmqpRPCServer(amqpConf, c.Publisher.MaxConcurrentRPCServerRequests, stats)
cmd.FailOnError(err, "Unable to create Publisher RPC server")
rpc.NewPublisherServer(pubs, &pubi)

View File

@ -20,6 +20,8 @@ import (
"github.com/letsencrypt/boulder/rpc"
)
const clientName = "RA"
func main() {
app := cmd.NewAppShell("boulder-ra", "Handles service orchestration")
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
@ -40,7 +42,6 @@ func main() {
go cmd.ProfileCmd("RA", stats)
amqpConf := c.RA.AMQP
clientName := "RA"
vac, err := rpc.NewValidationAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create VA client")
@ -70,7 +71,7 @@ func main() {
rai.CA = cac
rai.SA = sac
ras, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.RA, c.RA.MaxConcurrentRPCServerRequests, stats)
ras, err := rpc.NewAmqpRPCServer(amqpConf, c.RA.MaxConcurrentRPCServerRequests, stats)
cmd.FailOnError(err, "Unable to create RA RPC server")
rpc.NewRegistrationAuthorityServer(ras, rai)

View File

@ -30,7 +30,7 @@ func main() {
go cmd.ProfileCmd("SA", stats)
amqpConf := saConf.AMQP
sas, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.SA, c.SA.MaxConcurrentRPCServerRequests, stats)
sas, err := rpc.NewAmqpRPCServer(amqpConf, c.SA.MaxConcurrentRPCServerRequests, stats)
cmd.FailOnError(err, "Unable to create SA RPC server")
rpc.NewStorageAuthorityServer(sas, sai)

View File

@ -18,6 +18,8 @@ import (
"github.com/letsencrypt/boulder/va"
)
const clientName = "VA"
func main() {
app := cmd.NewAppShell("boulder-va", "Handles challenge validation")
app.Action = func(c cmd.Config, stats statsd.Statter, auditlogger *blog.AuditLogger) {
@ -51,13 +53,12 @@ func main() {
vai.UserAgent = c.VA.UserAgent
amqpConf := c.VA.AMQP
clientName := "VA"
rac, err := rpc.NewRegistrationAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create RA client")
vai.RA = rac
vas, err := rpc.NewAmqpRPCServer(amqpConf, amqpConf.VA, c.VA.MaxConcurrentRPCServerRequests, stats)
vas, err := rpc.NewAmqpRPCServer(amqpConf, c.VA.MaxConcurrentRPCServerRequests, stats)
cmd.FailOnError(err, "Unable to create VA RPC server")
rpc.NewValidationAuthorityServer(vas, vai)

View File

@ -22,9 +22,10 @@ import (
"github.com/letsencrypt/boulder/wfe"
)
const clientName = "WFE"
func setupWFE(c cmd.Config, logger *blog.AuditLogger, stats statsd.Statter) (*rpc.RegistrationAuthorityClient, *rpc.StorageAuthorityClient) {
amqpConf := c.WFE.AMQP
clientName := "WFE"
rac, err := rpc.NewRegistrationAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create RA client")

View File

@ -183,14 +183,17 @@ type ServiceConfig struct {
// AMQPConfig describes how to connect to AMQP, and how to speak to each of the
// RPC services we offer via AMQP.
type AMQPConfig struct {
Server string
Insecure bool
RA *RPCServerConfig
VA *RPCServerConfig
SA *RPCServerConfig
CA *RPCServerConfig
Publisher *RPCServerConfig
TLS *TLSConfig
Server string
Insecure bool
RA *RPCServerConfig
VA *RPCServerConfig
SA *RPCServerConfig
CA *RPCServerConfig
Publisher *RPCServerConfig
TLS *TLSConfig
// Queue name on which to listen, if this is an RPC service (vs acting only as
// an RPC client).
ServiceQueue string
ReconnectTimeouts struct {
Base ConfigDuration
Max ConfigDuration

View File

@ -208,6 +208,8 @@ func (ds durationSlice) Swap(a, b int) {
ds[a], ds[b] = ds[b], ds[a]
}
const clientName = "ExpirationMailer"
func main() {
app := cmd.NewAppShell("expiration-mailer", "Sends certificate expiration emails")
@ -233,7 +235,6 @@ func main() {
cmd.FailOnError(err, "Could not connect to database")
amqpConf := c.SA.AMQP
clientName := "ExpirationMailer"
sac, err := rpc.NewStorageAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Failed to create SA client")

View File

@ -531,13 +531,14 @@ func (l *looper) loop() error {
}
}
const clientName = "OCSP"
func setupClients(c cmd.OCSPUpdaterConfig, stats statsd.Statter) (
core.CertificateAuthority,
core.Publisher,
core.StorageAuthority,
) {
amqpConf := c.AMQP
clientName := "OCSP"
cac, err := rpc.NewCertificateAuthorityClient(clientName, amqpConf, stats)
cmd.FailOnError(err, "Unable to create CA client")

View File

@ -177,7 +177,7 @@ type AmqpRPCServer struct {
// NewAmqpRPCServer creates a new RPC server for the given queue and will begin
// consuming requests from the queue. To start the server you must call Start().
func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, rpcConf *cmd.RPCServerConfig, maxConcurrentRPCServerRequests int64, stats statsd.Statter) (*AmqpRPCServer, error) {
func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, maxConcurrentRPCServerRequests int64, stats statsd.Statter) (*AmqpRPCServer, error) {
log := blog.GetAuditLogger()
reconnectBase := amqpConf.ReconnectTimeouts.Base.Duration
@ -190,8 +190,8 @@ func NewAmqpRPCServer(amqpConf *cmd.AMQPConfig, rpcConf *cmd.RPCServerConfig, ma
}
return &AmqpRPCServer{
serverQueue: rpcConf.Server,
connection: newAMQPConnector(rpcConf.Server, reconnectBase, reconnectMax),
serverQueue: amqpConf.ServiceQueue,
connection: newAMQPConnector(amqpConf.ServiceQueue, reconnectBase, reconnectMax),
log: log,
dispatchTable: make(map[string]func([]byte) ([]byte, error)),
maxConcurrentRPCServerRequests: maxConcurrentRPCServerRequests,

View File

@ -98,7 +98,7 @@
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"CA": { "server": "CA.server" },
"serviceQueue": "CA.server",
"SA": {
"server": "SA.server",
"rpcTimeout": "1s"
@ -129,7 +129,7 @@
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"RA": { "server": "RA.server" },
"serviceQueue": "RA.server",
"VA": {
"server": "VA.server",
"rpcTimeout": "60s"
@ -153,7 +153,7 @@
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"SA": { "server": "SA.server" }
"serviceQueue": "SA.server"
}
},
@ -169,7 +169,7 @@
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"VA": { "server": "VA.server" },
"serviceQueue": "VA.server",
"RA": {
"server": "RA.server",
"rpcTimeout": "1s"
@ -267,7 +267,7 @@
"amqp": {
"server": "amqp://guest:guest@localhost:5673",
"insecure": true,
"Publisher": { "server": "Publisher.server" },
"serviceQueue": "Publisher.server",
"SA": {
"server": "SA.server",
"rpcTimeout": "1s"