Add comments, move reconnection logic out of binaries (except for the WFE which is a weird case)

This commit is contained in:
Roland Shoemaker 2015-08-03 23:02:52 -07:00
parent 3d7992ae43
commit c9c05cfb46
6 changed files with 81 additions and 74 deletions

View File

@ -47,12 +47,7 @@ func main() {
go cmd.ProfileCmd("CA", stats)
for {
ch, err := cmd.AmqpChannel(c)
cmd.FailOnError(err, "Could not connect to AMQP")
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
connectionHandler := func(ch *amqp.Channel) *rpc.AmqpRPCServer {
saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, ch)
cmd.FailOnError(err, "Unable to create RPC client")
@ -66,11 +61,12 @@ func main() {
err = rpc.NewCertificateAuthorityServer(cas, cai)
cmd.FailOnError(err, "Unable to create CA server")
auditlogger.Info(app.VersionString())
err = cmd.RunUntilSignaled(auditlogger, cas, closeChan)
cmd.FailOnError(err, "Unable to run CA RPC server")
return cas
}
auditlogger.Info(app.VersionString())
err = cmd.RunAndReconnectUntilSignaled(connectionHandler, c, auditlogger)
cmd.FailOnError(err, "Unable to run CA RPC server")
}
app.Run()

View File

@ -45,12 +45,7 @@ func main() {
go cmd.ProfileCmd("RA", stats)
for {
ch, err := cmd.AmqpChannel(c)
cmd.FailOnError(err, "Could not connect to AMQP")
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
connectionHandler := func(ch *amqp.Channel) *rpc.AmqpRPCServer {
vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, ch)
cmd.FailOnError(err, "Unable to create RPC client")
@ -78,12 +73,12 @@ func main() {
err = rpc.NewRegistrationAuthorityServer(ras, &rai)
cmd.FailOnError(err, "Unable to create RA server")
auditlogger.Info(app.VersionString())
err = cmd.RunUntilSignaled(auditlogger, ras, closeChan)
cmd.FailOnError(err, "Unable to run RA RPC server")
return ras
}
auditlogger.Info(app.VersionString())
err = cmd.RunAndReconnectUntilSignaled(connectionHandler, c, auditlogger)
cmd.FailOnError(err, "Unable to run RA RPC server")
}
app.Run()

View File

@ -44,22 +44,18 @@ func main() {
go cmd.ProfileCmd("SA", stats)
for {
ch, err := cmd.AmqpChannel(c)
cmd.FailOnError(err, "Could not connect to AMQP")
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
connectionHandler := func(ch *amqp.Channel) *rpc.AmqpRPCServer {
sas := rpc.NewAmqpRPCServer(c.AMQP.SA.Server, ch)
err = rpc.NewStorageAuthorityServer(sas, sai)
cmd.FailOnError(err, "Could create SA RPC server")
auditlogger.Info(app.VersionString())
err = cmd.RunUntilSignaled(auditlogger, sas, closeChan)
cmd.FailOnError(err, "Unable to run SA RPC server")
return sas
}
auditlogger.Info(app.VersionString())
err = cmd.RunAndReconnectUntilSignaled(connectionHandler, c, auditlogger)
cmd.FailOnError(err, "Unable to run SA RPC server")
}
app.Run()

View File

@ -43,12 +43,7 @@ func main() {
vai.DNSResolver = core.NewDNSResolverImpl(dnsTimeout, []string{c.Common.DNSResolver})
vai.UserAgent = c.VA.UserAgent
for {
ch, err := cmd.AmqpChannel(c)
cmd.FailOnError(err, "Could not connect to AMQP")
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
connectionHandler := func(ch *amqp.Channel) *rpc.AmqpRPCServer {
raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, ch)
cmd.FailOnError(err, "Unable to create RPC client")
@ -62,10 +57,12 @@ func main() {
err = rpc.NewValidationAuthorityServer(vas, &vai)
cmd.FailOnError(err, "Unable to create VA server")
auditlogger.Info(app.VersionString())
err = cmd.RunUntilSignaled(auditlogger, vas, closeChan)
cmd.FailOnError(err, "Unable to run SA RPC server")
return vas
}
auditlogger.Info(app.VersionString())
err = cmd.RunAndReconnectUntilSignaled(connectionHandler, c, auditlogger)
cmd.FailOnError(err, "Unable to run VA RPC server")
}
app.Run()

View File

@ -334,37 +334,47 @@ func RunForever(server *rpc.AmqpRPCServer) {
<-forever
}
// RunUntilSignaled starts the server and run until we get something on closeChan
func RunUntilSignaled(logger *blog.AuditLogger, server *rpc.AmqpRPCServer, closeChan chan *amqp.Error) error {
finishedProcessing, err := server.Start()
if err != nil {
return err
}
stopWatching, err := server.HandleInterrupts()
if err != nil {
return err
}
fmt.Fprintf(os.Stderr, "Server running...\n")
// Block until channel closes
// RunAndReconnectUntilSignaled
func RunAndReconnectUntilSignaled(connectionHandler func(ch *amqp.Channel) *rpc.AmqpRPCServer, c Config, logger *blog.AuditLogger) error {
for {
finished := true
select {
case err := <-closeChan:
logger.Warning(fmt.Sprintf("AMQP Channel closed, will reconnect in 5 seconds: [%s]", err))
stopWatching <- true
time.Sleep(time.Second * 5)
logger.Warning("Reconnecting to AMQP...")
finished = true
case <-finishedProcessing:
logger.Info(" [!] Finished processing remaining messages, exiting")
os.Exit(0)
ch, err := AmqpChannel(c)
if err != nil {
return err
}
if finished {
break
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
server := connectionHandler(ch)
finishedProcessing, err := server.Start()
if err != nil {
return err
}
stopWatching, err := server.HandleInterrupts()
if err != nil {
return err
}
fmt.Fprintf(os.Stderr, "Server running...\n")
// Block until channel closes
for {
finished := false
select {
case err := <-closeChan:
logger.Warning(fmt.Sprintf("AMQP Channel closed, will reconnect in 5 seconds: [%s]", err))
stopWatching <- true
time.Sleep(time.Second * 5)
logger.Warning("Reconnecting to AMQP...")
finished = true
case <-finishedProcessing:
logger.Info(" [!] Finished processing remaining messages, exiting")
os.Exit(0)
}
if finished {
break
}
}
}
return nil
}
// ProfileCmd runs forever, sending Go statistics to StatsD.

View File

@ -97,6 +97,16 @@ func AMQPDeclareExchange(conn *amqp.Connection) error {
return err
}
// Create a quick consumer ID we can recreate later if we need to cancel
// a consumer.
func getConsumerName(serverQueue string) (string, error) {
hostname, err := os.Hostname()
if err != nil {
return "", err
}
return fmt.Sprintf("%s.%s", serverQueue, hostname), nil
}
// A simplified way to declare and subscribe to an AMQP queue
func amqpSubscribe(ch *amqp.Channel, name string, log *blog.AuditLogger) (<-chan amqp.Delivery, error) {
var err error
@ -126,12 +136,14 @@ func amqpSubscribe(ch *amqp.Channel, name string, log *blog.AuditLogger) (<-chan
return nil, err
}
hostname, err := os.Hostname()
consumerName, err := getConsumerName(name)
if err != nil {
return nil, err
}
consumerName := fmt.Sprintf("%s.%s", name, hostname)
// A consumer name is used so that the specific consumer can be cancelled later
// if signalled. If no name is used a UID is used which cannot be retrieved (as
// far as I can tell).
msgs, err := ch.Consume(
name,
consumerName,
@ -247,8 +259,9 @@ type RPCResponse struct {
Error RPCError `json:"error,omitempty"`
}
// Start starts the AMQP-RPC server running in a separate thread.
// There is currently no Stop() method.
// Start starts the AMQP-RPC server running in a separate thread. A channel, finished,
// is returned so that the caller can detect when the for/range messages loop has
// broke in the spawned thread so we can cleanly exit.
func (rpc *AmqpRPCServer) Start() (finished chan bool, err error) {
msgs, err := amqpSubscribe(rpc.channel, rpc.serverQueue, rpc.log)
if err != nil {
@ -292,19 +305,19 @@ func (rpc *AmqpRPCServer) Start() (finished chan bool, err error) {
return
}
// HandleInterrupts creates a Goroutine to sit and watch for INT and TERM signals
// HandleInterrupts creates a Goroutine to sit and watch for INT, TERM, or HUP signals
// and cancel the server consumer if it sees one so servers can gracefully shutdown.
func (rpc *AmqpRPCServer) HandleInterrupts() (chan bool, error) {
stopWatching := make(chan bool, 1)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, os.Interrupt)
signal.Notify(sigChan, syscall.SIGINT)
signal.Notify(sigChan, syscall.SIGHUP)
hostname, err := os.Hostname()
consumerName, err := getConsumerName(rpc.serverQueue)
if err != nil {
return nil, err
}
consumerName := fmt.Sprintf("%s.%s", rpc.serverQueue, hostname)
go func() {
finished := false