Support TLS mutual authentication for AMQPS
This commit is contained in:
parent
4e7818ac7f
commit
f95e9eaa83
|
|
@ -148,7 +148,9 @@ func main() {
|
||||||
|
|
||||||
blog.SetAuditLogger(auditlogger)
|
blog.SetAuditLogger(auditlogger)
|
||||||
|
|
||||||
ch := cmd.AmqpChannel(c.AMQP.Server)
|
ch, err := cmd.AmqpChannel(c)
|
||||||
|
|
||||||
|
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||||
|
|
||||||
go cmd.ProfileCmd("AM", stats)
|
go cmd.ProfileCmd("AM", stats)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,8 @@ func setupContext(context *cli.Context) (rpc.CertificateAuthorityClient, *blog.A
|
||||||
cmd.FailOnError(err, "Could not connect to Syslog")
|
cmd.FailOnError(err, "Could not connect to Syslog")
|
||||||
blog.SetAuditLogger(auditlogger)
|
blog.SetAuditLogger(auditlogger)
|
||||||
|
|
||||||
ch := cmd.AmqpChannel(c.AMQP.Server)
|
ch, err := cmd.AmqpChannel(c)
|
||||||
|
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||||
|
|
||||||
caRPC, err := rpc.NewAmqpRPCClient("revoker->CA", c.AMQP.CA.Server, ch)
|
caRPC, err := rpc.NewAmqpRPCClient("revoker->CA", c.AMQP.CA.Server, ch)
|
||||||
cmd.FailOnError(err, "Unable to create RPC client")
|
cmd.FailOnError(err, "Unable to create RPC client")
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,9 @@ func main() {
|
||||||
go cmd.ProfileCmd("CA", stats)
|
go cmd.ProfileCmd("CA", stats)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
ch := cmd.AmqpChannel(c.AMQP.Server)
|
ch, err := cmd.AmqpChannel(c)
|
||||||
|
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||||
|
|
||||||
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
||||||
|
|
||||||
saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, ch)
|
saRPC, err := rpc.NewAmqpRPCClient("CA->SA", c.AMQP.SA.Server, ch)
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,9 @@ func main() {
|
||||||
go cmd.ProfileCmd("RA", stats)
|
go cmd.ProfileCmd("RA", stats)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
ch := cmd.AmqpChannel(c.AMQP.Server)
|
ch, err := cmd.AmqpChannel(c)
|
||||||
|
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||||
|
|
||||||
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
||||||
|
|
||||||
vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, ch)
|
vaRPC, err := rpc.NewAmqpRPCClient("RA->VA", c.AMQP.VA.Server, ch)
|
||||||
|
|
|
||||||
|
|
@ -42,7 +42,9 @@ func main() {
|
||||||
go cmd.ProfileCmd("SA", stats)
|
go cmd.ProfileCmd("SA", stats)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
ch := cmd.AmqpChannel(c.AMQP.Server)
|
ch, err := cmd.AmqpChannel(c)
|
||||||
|
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||||
|
|
||||||
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
||||||
|
|
||||||
sas := rpc.NewAmqpRPCServer(c.AMQP.SA.Server, ch)
|
sas := rpc.NewAmqpRPCServer(c.AMQP.SA.Server, ch)
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,9 @@ func main() {
|
||||||
vai.DNSResolver = core.NewDNSResolver(dnsTimeout, []string{c.VA.DNSResolver})
|
vai.DNSResolver = core.NewDNSResolver(dnsTimeout, []string{c.VA.DNSResolver})
|
||||||
|
|
||||||
for {
|
for {
|
||||||
ch := cmd.AmqpChannel(c.AMQP.Server)
|
ch, err := cmd.AmqpChannel(c)
|
||||||
|
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||||
|
|
||||||
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
||||||
|
|
||||||
raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, ch)
|
raRPC, err := rpc.NewAmqpRPCClient("VA->RA", c.AMQP.RA.Server, ch)
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func setupWFE(c cmd.Config) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient, chan *amqp.Error) {
|
func setupWFE(c cmd.Config) (rpc.RegistrationAuthorityClient, rpc.StorageAuthorityClient, chan *amqp.Error) {
|
||||||
ch := cmd.AmqpChannel(c.AMQP.Server)
|
ch, err := cmd.AmqpChannel(c)
|
||||||
|
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||||
|
|
||||||
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
||||||
|
|
||||||
raRPC, err := rpc.NewAmqpRPCClient("WFE->RA", c.AMQP.RA.Server, ch)
|
raRPC, err := rpc.NewAmqpRPCClient("WFE->RA", c.AMQP.RA.Server, ch)
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,9 @@ import (
|
||||||
const ocspResponseLimit int = 128
|
const ocspResponseLimit int = 128
|
||||||
|
|
||||||
func setupClients(c cmd.Config) (rpc.CertificateAuthorityClient, chan *amqp.Error) {
|
func setupClients(c cmd.Config) (rpc.CertificateAuthorityClient, chan *amqp.Error) {
|
||||||
ch := cmd.AmqpChannel(c.AMQP.Server)
|
ch, err := cmd.AmqpChannel(c)
|
||||||
|
cmd.FailOnError(err, "Could not connect to AMQP")
|
||||||
|
|
||||||
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
closeChan := ch.NotifyClose(make(chan *amqp.Error, 1))
|
||||||
|
|
||||||
caRPC, err := rpc.NewAmqpRPCClient("OCSP->CA", c.AMQP.CA.Server, ch)
|
caRPC, err := rpc.NewAmqpRPCClient("OCSP->CA", c.AMQP.CA.Server, ch)
|
||||||
|
|
|
||||||
63
cmd/shell.go
63
cmd/shell.go
|
|
@ -22,6 +22,8 @@
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
@ -29,6 +31,7 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd"
|
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/cactus/go-statsd-client/statsd"
|
||||||
|
|
@ -54,6 +57,7 @@ type Config struct {
|
||||||
SA Queue
|
SA Queue
|
||||||
CA Queue
|
CA Queue
|
||||||
OCSP Queue
|
OCSP Queue
|
||||||
|
SSL *SSLConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
WFE struct {
|
WFE struct {
|
||||||
|
|
@ -125,6 +129,13 @@ type Config struct {
|
||||||
SubscriberAgreementURL string
|
SubscriberAgreementURL string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SSLConfig reprents certificates and a key for authenticated TLS.
|
||||||
|
type SSLConfig struct {
|
||||||
|
CertFile string
|
||||||
|
KeyFile string
|
||||||
|
CACertFile *string // Optional
|
||||||
|
}
|
||||||
|
|
||||||
// Queue describes a queue name
|
// Queue describes a queue name
|
||||||
type Queue struct {
|
type Queue struct {
|
||||||
Server string
|
Server string
|
||||||
|
|
@ -188,19 +199,57 @@ func (as *AppShell) VersionString() string {
|
||||||
func FailOnError(err error, msg string) {
|
func FailOnError(err error, msg string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// AUDIT[ Error Conditions ] 9cc4d537-8534-4970-8665-4b382abe82f3
|
// AUDIT[ Error Conditions ] 9cc4d537-8534-4970-8665-4b382abe82f3
|
||||||
panic(fmt.Sprintf("%s: %s", msg, err))
|
fmt.Fprintf(os.Stderr, "%s: %s", msg, err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AmqpChannel is the same as amqpConnect in boulder, but with even
|
// AmqpChannel is the same as amqpConnect in boulder, but with even
|
||||||
// more aggressive error dropping
|
// more aggressive error dropping
|
||||||
func AmqpChannel(url string) (ch *amqp.Channel) {
|
func AmqpChannel(conf Config) (*amqp.Channel, error) {
|
||||||
conn, err := amqp.Dial(url)
|
var conn *amqp.Connection
|
||||||
FailOnError(err, "Unable to connect to AMQP server")
|
|
||||||
|
|
||||||
ch, err = conn.Channel()
|
if conf.AMQP.SSL != nil {
|
||||||
FailOnError(err, "Unable to establish channel to AMQP server")
|
if strings.HasPrefix(conf.AMQP.Server, "amqps") == false {
|
||||||
return
|
err := fmt.Errorf("SSL configuration provided, but not using an AMQPS URL")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := new(tls.Config)
|
||||||
|
|
||||||
|
cert, err := tls.LoadX509KeyPair(conf.AMQP.SSL.CertFile, conf.AMQP.SSL.KeyFile)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("Could not load Client Certificates: %s", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cfg.Certificates = append(cfg.Certificates, cert)
|
||||||
|
|
||||||
|
if conf.AMQP.SSL.CACertFile != nil {
|
||||||
|
cfg.RootCAs = x509.NewCertPool()
|
||||||
|
|
||||||
|
ca, err := ioutil.ReadFile(*conf.AMQP.SSL.CACertFile)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("Could not load CA Certificate: %s", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cfg.RootCAs.AppendCertsFromPEM(ca)
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err = amqp.DialTLS(conf.AMQP.Server, cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return conn.Channel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configuration did not specify SSL options
|
||||||
|
conn, err := amqp.Dial(conf.AMQP.Server)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return conn.Channel()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunForever starts the server and wait around
|
// RunForever starts the server and wait around
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,11 @@
|
||||||
|
|
||||||
"amqp": {
|
"amqp": {
|
||||||
"server": "amqp://guest:guest@localhost:5672",
|
"server": "amqp://guest:guest@localhost:5672",
|
||||||
|
"-uncomment_for_AMQPS-ssl": {
|
||||||
|
"cacertfile": "/etc/boulder/rabbitmq-cacert.pem",
|
||||||
|
"certfile": "/etc/boulder/rabbitmq-cert.pem",
|
||||||
|
"keyfile": "/etc/boulder/rabbitmq-key.pem"
|
||||||
|
},
|
||||||
"RA": {
|
"RA": {
|
||||||
"client": "RA.client",
|
"client": "RA.client",
|
||||||
"server": "RA.server"
|
"server": "RA.server"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue