Revert "Allow expiration mailer to work in parallel" (#6080)
When deployed, the newly-parallel expiration-mailer encountered unexpected difficulties and dropped to apparently sending nearly zero emails despite not throwing any real errors. Reverting the parallelism change until we understand and can fix the root cause. This reverts two commits: - Allow expiration mailer to work in parallel (#6057) - Fix data race in expiration-mailer test mocks (#6072) It also modifies the revert to leave the new `ParallelSends` config key in place (albeit completely ignored), so that the binary containing this revert can be safely deployed regardless of config status. Part of #5682
This commit is contained in:
parent
a2ff222fda
commit
7ef6913e71
|
|
@ -194,12 +194,12 @@ var maxSerials = 100
|
|||
// sendMessage sends a single email to the provided address with the revoked
|
||||
// serials
|
||||
func (bkr *badKeyRevoker) sendMessage(addr string, serials []string) error {
|
||||
conn, err := bkr.mailer.Connect()
|
||||
err := bkr.mailer.Connect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
_ = bkr.mailer.Close()
|
||||
}()
|
||||
mutSerials := make([]string, len(serials))
|
||||
copy(mutSerials, serials)
|
||||
|
|
@ -213,7 +213,7 @@ func (bkr *badKeyRevoker) sendMessage(addr string, serials []string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = conn.SendMail([]string{addr}, bkr.emailSubject, message.String())
|
||||
err = bkr.mailer.SendMail([]string{addr}, bkr.emailSubject, message.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ import (
|
|||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
|
|
@ -54,7 +53,6 @@ type mailer struct {
|
|||
emailTemplate *template.Template
|
||||
subjectTemplate *template.Template
|
||||
nagTimes []time.Duration
|
||||
parallelSends uint
|
||||
limit int
|
||||
clk clock.Clock
|
||||
stats mailerStats
|
||||
|
|
@ -68,7 +66,7 @@ type mailerStats struct {
|
|||
processingLatency prometheus.Histogram
|
||||
}
|
||||
|
||||
func (m *mailer) sendNags(conn bmail.Conn, contacts []string, certs []*x509.Certificate) error {
|
||||
func (m *mailer) sendNags(contacts []string, certs []*x509.Certificate) error {
|
||||
if len(contacts) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -163,7 +161,7 @@ func (m *mailer) sendNags(conn bmail.Conn, contacts []string, certs []*x509.Cert
|
|||
m.log.Infof("attempting send JSON=%s", string(logStr))
|
||||
|
||||
startSending := m.clk.Now()
|
||||
err = conn.SendMail(emails, subjBuf.String(), msgBuf.String())
|
||||
err = m.mailer.SendMail(emails, subjBuf.String(), msgBuf.String())
|
||||
if err != nil {
|
||||
m.log.Errf("failed send JSON=%s", string(logStr))
|
||||
return err
|
||||
|
|
@ -194,11 +192,6 @@ func (m *mailer) certIsRenewed(names []string, issued time.Time) (bool, error) {
|
|||
return present, err
|
||||
}
|
||||
|
||||
type work struct {
|
||||
regID int64
|
||||
certs []core.Certificate
|
||||
}
|
||||
|
||||
func (m *mailer) processCerts(ctx context.Context, allCerts []core.Certificate) {
|
||||
regIDToCerts := make(map[int64][]core.Certificate)
|
||||
|
||||
|
|
@ -208,104 +201,76 @@ func (m *mailer) processCerts(ctx context.Context, allCerts []core.Certificate)
|
|||
regIDToCerts[cert.RegistrationID] = cs
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
workChan := make(chan work)
|
||||
|
||||
parallelSends := m.parallelSends
|
||||
if parallelSends == 0 {
|
||||
parallelSends = 1
|
||||
err := m.mailer.Connect()
|
||||
if err != nil {
|
||||
m.log.AuditErrf("Error connecting to send nag emails: %s", err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
_ = m.mailer.Close()
|
||||
}()
|
||||
|
||||
for senderNum := uint(0); senderNum < parallelSends; senderNum++ {
|
||||
conn, err := m.mailer.Connect()
|
||||
if err != nil {
|
||||
m.log.AuditErrf("connecting parallel sender %d: %s", senderNum, err)
|
||||
close(workChan)
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(conn bmail.Conn, ch <-chan work) {
|
||||
defer wg.Done()
|
||||
for w := range ch {
|
||||
err := m.sendToOneRegID(ctx, conn, w.regID, w.certs)
|
||||
if err != nil {
|
||||
m.log.AuditErr(err.Error())
|
||||
}
|
||||
}
|
||||
conn.Close()
|
||||
}(conn, workChan)
|
||||
|
||||
// For politeness' sake, don't open more than 1 new connection per
|
||||
// second.
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
for regID, certs := range regIDToCerts {
|
||||
workChan <- work{regID, certs}
|
||||
}
|
||||
close(workChan)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (m *mailer) sendToOneRegID(ctx context.Context, conn bmail.Conn, regID int64, certs []core.Certificate) error {
|
||||
reg, err := m.rs.GetRegistration(ctx, &sapb.RegistrationID{Id: regID})
|
||||
if err != nil {
|
||||
m.stats.errorCount.With(prometheus.Labels{"type": "GetRegistration"}).Inc()
|
||||
return fmt.Errorf("fetching registration %d: %w", regID, err)
|
||||
}
|
||||
|
||||
if reg.Contact == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
parsedCerts := []*x509.Certificate{}
|
||||
for _, cert := range certs {
|
||||
parsedCert, err := x509.ParseCertificate(cert.DER)
|
||||
reg, err := m.rs.GetRegistration(ctx, &sapb.RegistrationID{Id: regID})
|
||||
if err != nil {
|
||||
m.stats.errorCount.With(prometheus.Labels{"type": "ParseCertificate"}).Inc()
|
||||
// TODO(#1420): tell registration about this error
|
||||
return fmt.Errorf("parsing certificate %s: %w", cert.Serial, err)
|
||||
m.log.AuditErrf("Error fetching registration %d: %s", regID, err)
|
||||
m.stats.errorCount.With(prometheus.Labels{"type": "GetRegistration"}).Inc()
|
||||
continue
|
||||
}
|
||||
|
||||
renewed, err := m.certIsRenewed(parsedCert.DNSNames, parsedCert.NotBefore)
|
||||
if err != nil {
|
||||
return fmt.Errorf("expiration-mailer: error fetching renewal state: %w", err)
|
||||
} else if renewed {
|
||||
m.stats.renewalCount.With(prometheus.Labels{}).Inc()
|
||||
err := m.updateCertStatus(cert.Serial)
|
||||
parsedCerts := []*x509.Certificate{}
|
||||
for _, cert := range certs {
|
||||
parsedCert, err := x509.ParseCertificate(cert.DER)
|
||||
if err != nil {
|
||||
m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc()
|
||||
return fmt.Errorf("updating certificate status for %s: %w", cert.Serial, err)
|
||||
// TODO(#1420): tell registration about this error
|
||||
m.log.AuditErrf("Error parsing certificate %s: %s", cert.Serial, err)
|
||||
m.stats.errorCount.With(prometheus.Labels{"type": "ParseCertificate"}).Inc()
|
||||
continue
|
||||
}
|
||||
|
||||
renewed, err := m.certIsRenewed(parsedCert.DNSNames, parsedCert.NotBefore)
|
||||
if err != nil {
|
||||
m.log.AuditErrf("expiration-mailer: error fetching renewal state: %v", err)
|
||||
// assume not renewed
|
||||
} else if renewed {
|
||||
m.log.Debugf("Cert %s is already renewed", cert.Serial)
|
||||
m.stats.renewalCount.With(prometheus.Labels{}).Inc()
|
||||
err := m.updateCertStatus(cert.Serial)
|
||||
if err != nil {
|
||||
m.log.AuditErrf("Error updating certificate status for %s: %s", cert.Serial, err)
|
||||
m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
parsedCerts = append(parsedCerts, parsedCert)
|
||||
}
|
||||
|
||||
if len(parsedCerts) == 0 {
|
||||
// all certificates are renewed
|
||||
continue
|
||||
}
|
||||
|
||||
parsedCerts = append(parsedCerts, parsedCert)
|
||||
}
|
||||
if reg.Contact == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(parsedCerts) == 0 {
|
||||
// all certificates are renewed
|
||||
return nil
|
||||
}
|
||||
|
||||
err = m.sendNags(conn, reg.Contact, parsedCerts)
|
||||
if err != nil {
|
||||
m.stats.errorCount.With(prometheus.Labels{"type": "SendNags"}).Inc()
|
||||
return fmt.Errorf("sending nag emails: %w", err)
|
||||
}
|
||||
for _, cert := range parsedCerts {
|
||||
serial := core.SerialToString(cert.SerialNumber)
|
||||
err = m.updateCertStatus(serial)
|
||||
err = m.sendNags(reg.Contact, parsedCerts)
|
||||
if err != nil {
|
||||
// Don't return immediately; we'd like to at least try and update the status for
|
||||
// all certificates, even if one of them experienced an error (which might have
|
||||
// been intermittent)
|
||||
m.log.AuditErrf("updating certificate status for %s: %s", serial, err)
|
||||
m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc()
|
||||
m.stats.errorCount.With(prometheus.Labels{"type": "SendNags"}).Inc()
|
||||
m.log.AuditErrf("Error sending nag emails: %s", err)
|
||||
continue
|
||||
}
|
||||
for _, cert := range parsedCerts {
|
||||
serial := core.SerialToString(cert.SerialNumber)
|
||||
err = m.updateCertStatus(serial)
|
||||
if err != nil {
|
||||
m.log.AuditErrf("Error updating certificate status for %s: %s", serial, err)
|
||||
m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc()
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mailer) findExpiringCertificates(ctx context.Context) error {
|
||||
|
|
@ -624,7 +589,6 @@ func main() {
|
|||
emailTemplate: tmpl,
|
||||
nagTimes: nags,
|
||||
limit: c.Mailer.CertLimit,
|
||||
parallelSends: c.Mailer.ParallelSends,
|
||||
clk: clk,
|
||||
stats: initStats(scope),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import (
|
|||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"net"
|
||||
|
|
@ -22,7 +21,6 @@ import (
|
|||
"github.com/letsencrypt/boulder/db"
|
||||
berrors "github.com/letsencrypt/boulder/errors"
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
bmail "github.com/letsencrypt/boulder/mail"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
"github.com/letsencrypt/boulder/mocks"
|
||||
"github.com/letsencrypt/boulder/sa"
|
||||
|
|
@ -130,9 +128,7 @@ func TestSendNags(t *testing.T) {
|
|||
DNSNames: []string{"example.com"},
|
||||
}
|
||||
|
||||
conn, err := m.mailer.Connect()
|
||||
test.AssertNotError(t, err, "connecting SMTP")
|
||||
err = m.sendNags(conn, []string{emailA}, []*x509.Certificate{cert})
|
||||
err := m.sendNags([]string{emailA}, []*x509.Certificate{cert})
|
||||
test.AssertNotError(t, err, "Failed to send warning messages")
|
||||
test.AssertEquals(t, len(mc.Messages), 1)
|
||||
test.AssertEquals(t, mocks.MailerMessage{
|
||||
|
|
@ -142,9 +138,7 @@ func TestSendNags(t *testing.T) {
|
|||
}, mc.Messages[0])
|
||||
|
||||
mc.Clear()
|
||||
conn, err = m.mailer.Connect()
|
||||
test.AssertNotError(t, err, "connecting SMTP")
|
||||
err = m.sendNags(conn, []string{emailA, emailB}, []*x509.Certificate{cert})
|
||||
err = m.sendNags([]string{emailA, emailB}, []*x509.Certificate{cert})
|
||||
test.AssertNotError(t, err, "Failed to send warning messages")
|
||||
test.AssertEquals(t, len(mc.Messages), 2)
|
||||
test.AssertEquals(t, mocks.MailerMessage{
|
||||
|
|
@ -159,9 +153,7 @@ func TestSendNags(t *testing.T) {
|
|||
}, mc.Messages[1])
|
||||
|
||||
mc.Clear()
|
||||
conn, err = m.mailer.Connect()
|
||||
test.AssertNotError(t, err, "connecting SMTP")
|
||||
err = m.sendNags(conn, []string{}, []*x509.Certificate{cert})
|
||||
err = m.sendNags([]string{}, []*x509.Certificate{cert})
|
||||
test.AssertNotError(t, err, "Not an error to pass no email contacts")
|
||||
test.AssertEquals(t, len(mc.Messages), 0)
|
||||
|
||||
|
|
@ -225,37 +217,6 @@ func TestProcessCerts(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestProcessCertsParallel(t *testing.T) {
|
||||
testCtx := setup(t, []time.Duration{time.Hour * 24 * 7})
|
||||
|
||||
testCtx.m.parallelSends = 2
|
||||
certs := addExpiringCerts(t, testCtx)
|
||||
log.Clear()
|
||||
testCtx.m.processCerts(context.Background(), certs)
|
||||
// Test that the lastExpirationNagSent was updated for the certificate
|
||||
// corresponding to serial4, which is set up as "already renewed" by
|
||||
// addExpiringCerts.
|
||||
if len(log.GetAllMatching("DEBUG: SQL: UPDATE certificateStatus .*2006-01-02 15:04:05.999999999.*\"000000000000000000000000000000001339\"")) != 1 {
|
||||
t.Errorf("Expected an update to certificateStatus, got these log lines:\n%s",
|
||||
strings.Join(log.GetAllMatching(".*"), "\n"))
|
||||
}
|
||||
}
|
||||
|
||||
type erroringMailClient struct{}
|
||||
|
||||
func (e erroringMailClient) Connect() (bmail.Conn, error) {
|
||||
return nil, errors.New("whoopsie-doo")
|
||||
}
|
||||
|
||||
func TestProcessCertsConnectError(t *testing.T) {
|
||||
testCtx := setup(t, []time.Duration{time.Hour * 24 * 7})
|
||||
|
||||
testCtx.m.mailer = erroringMailClient{}
|
||||
certs := addExpiringCerts(t, testCtx)
|
||||
// Checking that this terminates rather than deadlocks
|
||||
testCtx.m.processCerts(context.Background(), certs)
|
||||
}
|
||||
|
||||
func TestFindExpiringCertificates(t *testing.T) {
|
||||
testCtx := setup(t, []time.Duration{time.Hour * 24, time.Hour * 24 * 4, time.Hour * 24 * 7})
|
||||
|
||||
|
|
|
|||
|
|
@ -33,9 +33,7 @@ func TestSendEarliestCertInfo(t *testing.T) {
|
|||
serial2,
|
||||
)
|
||||
|
||||
conn, err := ctx.m.mailer.Connect()
|
||||
test.AssertNotError(t, err, "connecting SMTP")
|
||||
err = ctx.m.sendNags(conn, []string{email1, email2}, []*x509.Certificate{rawCertA, rawCertB})
|
||||
err := ctx.m.sendNags([]string{email1, email2}, []*x509.Certificate{rawCertA, rawCertB})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -154,12 +154,12 @@ func (m *mailer) run() error {
|
|||
m.log.Infof("Address %q was associated with the most recipients (%d)",
|
||||
mostRecipients, mostRecipientsLen)
|
||||
|
||||
conn, err := m.mailer.Connect()
|
||||
err = m.mailer.Connect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() { _ = conn.Close() }()
|
||||
defer func() { _ = m.mailer.Close() }()
|
||||
|
||||
startTime := m.clk.Now()
|
||||
sortedAddresses := sortAddresses(addressToRecipient)
|
||||
|
|
@ -186,7 +186,7 @@ func (m *mailer) run() error {
|
|||
continue
|
||||
}
|
||||
|
||||
err = conn.SendMail([]string{address}, m.subject, messageBody)
|
||||
err = m.mailer.SendMail([]string{address}, m.subject, messageBody)
|
||||
if err != nil {
|
||||
var badAddrErr bmail.BadAddressSMTPError
|
||||
if errors.As(err, &badAddrErr) {
|
||||
|
|
|
|||
177
mail/mailer.go
177
mail/mailer.go
|
|
@ -43,37 +43,20 @@ func (s realSource) generate() *big.Int {
|
|||
return randInt
|
||||
}
|
||||
|
||||
// Mailer is an interface that allows creating Conns. Implementations must
|
||||
// be safe for concurrent use.
|
||||
// Mailer provides the interface for a mailer
|
||||
type Mailer interface {
|
||||
Connect() (Conn, error)
|
||||
}
|
||||
|
||||
// Conn is an interface that allows sending mail. When you are done with a
|
||||
// Conn, call Close(). Implementations are not required to be safe for
|
||||
// concurrent use.
|
||||
type Conn interface {
|
||||
SendMail([]string, string, string) error
|
||||
Connect() error
|
||||
Close() error
|
||||
}
|
||||
|
||||
// connImpl represents a single connection to a mail server. It is not safe
|
||||
// for concurrent use.
|
||||
type connImpl struct {
|
||||
config
|
||||
client smtpClient
|
||||
}
|
||||
|
||||
// mailerImpl defines a mail transfer agent to use for sending mail. It is
|
||||
// safe for concurrent us.
|
||||
type mailerImpl struct {
|
||||
config
|
||||
}
|
||||
|
||||
type config struct {
|
||||
// MailerImpl defines a mail transfer agent to use for sending mail. It is not
|
||||
// safe for concurrent access.
|
||||
type MailerImpl struct {
|
||||
log blog.Logger
|
||||
dialer dialer
|
||||
from mail.Address
|
||||
client smtpClient
|
||||
clk clock.Clock
|
||||
csprgSource idGenerator
|
||||
reconnectBase time.Duration
|
||||
|
|
@ -143,7 +126,7 @@ func New(
|
|||
logger blog.Logger,
|
||||
stats prometheus.Registerer,
|
||||
reconnectBase time.Duration,
|
||||
reconnectMax time.Duration) *mailerImpl {
|
||||
reconnectMax time.Duration) *MailerImpl {
|
||||
|
||||
sendMailAttempts := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "send_mail_attempts",
|
||||
|
|
@ -151,46 +134,42 @@ func New(
|
|||
}, []string{"result", "error"})
|
||||
stats.MustRegister(sendMailAttempts)
|
||||
|
||||
return &mailerImpl{
|
||||
config: config{
|
||||
dialer: &dialerImpl{
|
||||
username: username,
|
||||
password: password,
|
||||
server: server,
|
||||
port: port,
|
||||
rootCAs: rootCAs,
|
||||
},
|
||||
log: logger,
|
||||
from: from,
|
||||
clk: clock.New(),
|
||||
csprgSource: realSource{},
|
||||
reconnectBase: reconnectBase,
|
||||
reconnectMax: reconnectMax,
|
||||
sendMailAttempts: sendMailAttempts,
|
||||
return &MailerImpl{
|
||||
dialer: &dialerImpl{
|
||||
username: username,
|
||||
password: password,
|
||||
server: server,
|
||||
port: port,
|
||||
rootCAs: rootCAs,
|
||||
},
|
||||
log: logger,
|
||||
from: from,
|
||||
clk: clock.New(),
|
||||
csprgSource: realSource{},
|
||||
reconnectBase: reconnectBase,
|
||||
reconnectMax: reconnectMax,
|
||||
sendMailAttempts: sendMailAttempts,
|
||||
}
|
||||
}
|
||||
|
||||
// New constructs a Mailer suitable for doing a dry run. It simply logs each
|
||||
// command that would have been run, at debug level.
|
||||
func NewDryRun(from mail.Address, logger blog.Logger) *mailerImpl {
|
||||
return &mailerImpl{
|
||||
config: config{
|
||||
dialer: dryRunClient{logger},
|
||||
from: from,
|
||||
clk: clock.New(),
|
||||
csprgSource: realSource{},
|
||||
sendMailAttempts: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "send_mail_attempts",
|
||||
Help: "A counter of send mail attempts labelled by result",
|
||||
}, []string{"result", "error"}),
|
||||
},
|
||||
func NewDryRun(from mail.Address, logger blog.Logger) *MailerImpl {
|
||||
return &MailerImpl{
|
||||
dialer: dryRunClient{logger},
|
||||
from: from,
|
||||
clk: clock.New(),
|
||||
csprgSource: realSource{},
|
||||
sendMailAttempts: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "send_mail_attempts",
|
||||
Help: "A counter of send mail attempts labelled by result",
|
||||
}, []string{"result", "error"}),
|
||||
}
|
||||
}
|
||||
|
||||
func (c config) generateMessage(to []string, subject, body string) ([]byte, error) {
|
||||
mid := c.csprgSource.generate()
|
||||
now := c.clk.Now().UTC()
|
||||
func (m *MailerImpl) generateMessage(to []string, subject, body string) ([]byte, error) {
|
||||
mid := m.csprgSource.generate()
|
||||
now := m.clk.Now().UTC()
|
||||
addrs := []string{}
|
||||
for _, a := range to {
|
||||
if !core.IsASCII(a) {
|
||||
|
|
@ -200,10 +179,10 @@ func (c config) generateMessage(to []string, subject, body string) ([]byte, erro
|
|||
}
|
||||
headers := []string{
|
||||
fmt.Sprintf("To: %s", strings.Join(addrs, ", ")),
|
||||
fmt.Sprintf("From: %s", c.from.String()),
|
||||
fmt.Sprintf("From: %s", m.from.String()),
|
||||
fmt.Sprintf("Subject: %s", subject),
|
||||
fmt.Sprintf("Date: %s", now.Format(time.RFC822)),
|
||||
fmt.Sprintf("Message-Id: <%s.%s.%s>", now.Format("20060102T150405"), mid.String(), c.from.Address),
|
||||
fmt.Sprintf("Message-Id: <%s.%s.%s>", now.Format("20060102T150405"), mid.String(), m.from.Address),
|
||||
"MIME-Version: 1.0",
|
||||
"Content-Type: text/plain; charset=UTF-8",
|
||||
"Content-Transfer-Encoding: quoted-printable",
|
||||
|
|
@ -229,31 +208,31 @@ func (c config) generateMessage(to []string, subject, body string) ([]byte, erro
|
|||
)), nil
|
||||
}
|
||||
|
||||
func (c *connImpl) reconnect() {
|
||||
func (m *MailerImpl) reconnect() {
|
||||
for i := 0; ; i++ {
|
||||
sleepDuration := core.RetryBackoff(i, c.reconnectBase, c.reconnectMax, 2)
|
||||
c.log.Infof("sleeping for %s before reconnecting mailer", sleepDuration)
|
||||
c.clk.Sleep(sleepDuration)
|
||||
c.log.Info("attempting to reconnect mailer")
|
||||
client, err := c.dialer.Dial()
|
||||
sleepDuration := core.RetryBackoff(i, m.reconnectBase, m.reconnectMax, 2)
|
||||
m.log.Infof("sleeping for %s before reconnecting mailer", sleepDuration)
|
||||
m.clk.Sleep(sleepDuration)
|
||||
m.log.Info("attempting to reconnect mailer")
|
||||
err := m.Connect()
|
||||
if err != nil {
|
||||
c.log.Warningf("reconnect error: %s", err)
|
||||
m.log.Warningf("reconnect error: %s", err)
|
||||
continue
|
||||
}
|
||||
c.client = client
|
||||
break
|
||||
}
|
||||
c.log.Info("reconnected successfully")
|
||||
m.log.Info("reconnected successfully")
|
||||
}
|
||||
|
||||
// Connect opens a connection to the specified mail server. It must be called
|
||||
// before SendMail.
|
||||
func (m *mailerImpl) Connect() (Conn, error) {
|
||||
func (m *MailerImpl) Connect() error {
|
||||
client, err := m.dialer.Dial()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
return &connImpl{m.config, client}, nil
|
||||
m.client = client
|
||||
return nil
|
||||
}
|
||||
|
||||
type dialerImpl struct {
|
||||
|
|
@ -286,43 +265,43 @@ func (di *dialerImpl) Dial() (smtpClient, error) {
|
|||
// argument as an error. If the reset command also errors, it combines both
|
||||
// errors and returns them. Without this we would get `nested MAIL command`.
|
||||
// https://github.com/letsencrypt/boulder/issues/3191
|
||||
func (c *connImpl) resetAndError(err error) error {
|
||||
func (m *MailerImpl) resetAndError(err error) error {
|
||||
if err == io.EOF {
|
||||
return err
|
||||
}
|
||||
if err2 := c.client.Reset(); err2 != nil {
|
||||
if err2 := m.client.Reset(); err2 != nil {
|
||||
return fmt.Errorf("%s (also, on sending RSET: %s)", err, err2)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *connImpl) sendOne(to []string, subject, msg string) error {
|
||||
if c.client == nil {
|
||||
func (m *MailerImpl) sendOne(to []string, subject, msg string) error {
|
||||
if m.client == nil {
|
||||
return errors.New("call Connect before SendMail")
|
||||
}
|
||||
body, err := c.generateMessage(to, subject, msg)
|
||||
body, err := m.generateMessage(to, subject, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = c.client.Mail(c.from.String()); err != nil {
|
||||
if err = m.client.Mail(m.from.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, t := range to {
|
||||
if err = c.client.Rcpt(t); err != nil {
|
||||
return c.resetAndError(err)
|
||||
if err = m.client.Rcpt(t); err != nil {
|
||||
return m.resetAndError(err)
|
||||
}
|
||||
}
|
||||
w, err := c.client.Data()
|
||||
w, err := m.client.Data()
|
||||
if err != nil {
|
||||
return c.resetAndError(err)
|
||||
return m.resetAndError(err)
|
||||
}
|
||||
_, err = w.Write(body)
|
||||
if err != nil {
|
||||
return c.resetAndError(err)
|
||||
return m.resetAndError(err)
|
||||
}
|
||||
err = w.Close()
|
||||
if err != nil {
|
||||
return c.resetAndError(err)
|
||||
return m.resetAndError(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -357,34 +336,34 @@ var badAddressErrorCodes = map[int]bool{
|
|||
|
||||
// SendMail sends an email to the provided list of recipients. The email body
|
||||
// is simple text.
|
||||
func (c *connImpl) SendMail(to []string, subject, msg string) error {
|
||||
func (m *MailerImpl) SendMail(to []string, subject, msg string) error {
|
||||
var protoErr *textproto.Error
|
||||
for {
|
||||
err := c.sendOne(to, subject, msg)
|
||||
err := m.sendOne(to, subject, msg)
|
||||
if err == nil {
|
||||
// If the error is nil, we sent the mail without issue. nice!
|
||||
break
|
||||
} else if err == io.EOF {
|
||||
c.sendMailAttempts.WithLabelValues("failure", "EOF").Inc()
|
||||
m.sendMailAttempts.WithLabelValues("failure", "EOF").Inc()
|
||||
// If the error is an EOF, we should try to reconnect on a backoff
|
||||
// schedule, sleeping between attempts.
|
||||
c.reconnect()
|
||||
m.reconnect()
|
||||
// After reconnecting, loop around and try `sendOne` again.
|
||||
continue
|
||||
} else if errors.Is(err, syscall.ECONNRESET) {
|
||||
c.sendMailAttempts.WithLabelValues("failure", "TCP RST").Inc()
|
||||
m.sendMailAttempts.WithLabelValues("failure", "TCP RST").Inc()
|
||||
// If the error is `syscall.ECONNRESET`, we should try to reconnect on a backoff
|
||||
// schedule, sleeping between attempts.
|
||||
c.reconnect()
|
||||
m.reconnect()
|
||||
// After reconnecting, loop around and try `sendOne` again.
|
||||
continue
|
||||
} else if errors.Is(err, syscall.EPIPE) {
|
||||
// EPIPE also seems to be a common way to signal TCP RST.
|
||||
c.sendMailAttempts.WithLabelValues("failure", "EPIPE").Inc()
|
||||
c.reconnect()
|
||||
m.sendMailAttempts.WithLabelValues("failure", "EPIPE").Inc()
|
||||
m.reconnect()
|
||||
continue
|
||||
} else if errors.As(err, &protoErr) && protoErr.Code == 421 {
|
||||
c.sendMailAttempts.WithLabelValues("failure", "SMTP 421").Inc()
|
||||
m.sendMailAttempts.WithLabelValues("failure", "SMTP 421").Inc()
|
||||
/*
|
||||
* If the error is an instance of `textproto.Error` with a SMTP error code,
|
||||
* and that error code is 421 then treat this as a reconnect-able event.
|
||||
|
|
@ -400,30 +379,28 @@ func (c *connImpl) SendMail(to []string, subject, msg string) error {
|
|||
*
|
||||
* [0] - https://github.com/letsencrypt/boulder/issues/2249
|
||||
*/
|
||||
c.reconnect()
|
||||
m.reconnect()
|
||||
// After reconnecting, loop around and try `sendOne` again.
|
||||
continue
|
||||
} else if errors.As(err, &protoErr) && badAddressErrorCodes[protoErr.Code] {
|
||||
c.sendMailAttempts.WithLabelValues("failure", fmt.Sprintf("SMTP %d", protoErr.Code)).Inc()
|
||||
m.sendMailAttempts.WithLabelValues("failure", fmt.Sprintf("SMTP %d", protoErr.Code)).Inc()
|
||||
return BadAddressSMTPError{fmt.Sprintf("%d: %s", protoErr.Code, protoErr.Msg)}
|
||||
} else {
|
||||
// If it wasn't an EOF error or a recoverable SMTP error it is unexpected and we
|
||||
// return from SendMail() with the error
|
||||
c.sendMailAttempts.WithLabelValues("failure", "unexpected").Inc()
|
||||
m.sendMailAttempts.WithLabelValues("failure", "unexpected").Inc()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
c.sendMailAttempts.WithLabelValues("success", "").Inc()
|
||||
m.sendMailAttempts.WithLabelValues("success", "").Inc()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the connection.
|
||||
func (c *connImpl) Close() error {
|
||||
err := c.client.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
func (m *MailerImpl) Close() error {
|
||||
if m.client == nil {
|
||||
return errors.New("call Connect before Close")
|
||||
}
|
||||
c.client = nil
|
||||
return nil
|
||||
return m.client.Close()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -264,7 +264,7 @@ func rstHandler(rstFirst int) connHandler {
|
|||
}
|
||||
}
|
||||
|
||||
func setup(t *testing.T) (*mailerImpl, *net.TCPListener, func()) {
|
||||
func setup(t *testing.T) (*MailerImpl, *net.TCPListener, func()) {
|
||||
fromAddress, _ := mail.ParseAddress("you-are-a-winner@example.com")
|
||||
log := blog.UseMock()
|
||||
|
||||
|
|
@ -322,11 +322,11 @@ func TestConnect(t *testing.T) {
|
|||
defer cleanUp()
|
||||
|
||||
go listenForever(l, t, normalHandler)
|
||||
conn, err := m.Connect()
|
||||
err := m.Connect()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to connect: %s", err)
|
||||
}
|
||||
err = conn.Close()
|
||||
err = m.Close()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to clean up: %s", err)
|
||||
}
|
||||
|
|
@ -344,11 +344,11 @@ func TestReconnectSuccess(t *testing.T) {
|
|||
// With a mailer client that has a max attempt > `closedConns` we expect no
|
||||
// error. The message should be delivered after `closedConns` reconnect
|
||||
// attempts.
|
||||
conn, err := m.Connect()
|
||||
err := m.Connect()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to connect: %s", err)
|
||||
}
|
||||
err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
|
||||
err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
|
||||
if err != nil {
|
||||
t.Errorf("Expected SendMail() to not fail. Got err: %s", err)
|
||||
}
|
||||
|
|
@ -361,12 +361,12 @@ func TestBadEmailError(t *testing.T) {
|
|||
|
||||
go listenForever(l, t, badEmailHandler(messages))
|
||||
|
||||
conn, err := m.Connect()
|
||||
err := m.Connect()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to connect: %s", err)
|
||||
}
|
||||
|
||||
err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
|
||||
err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
|
||||
// We expect there to be an error
|
||||
if err == nil {
|
||||
t.Errorf("Expected SendMail() to return an BadAddressSMTPError, got nil")
|
||||
|
|
@ -393,11 +393,11 @@ func TestReconnectSMTP421(t *testing.T) {
|
|||
// With a mailer client that has a max attempt > `closedConns` we expect no
|
||||
// error. The message should be delivered after `closedConns` reconnect
|
||||
// attempts.
|
||||
conn, err := m.Connect()
|
||||
err := m.Connect()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to connect: %s", err)
|
||||
}
|
||||
err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
|
||||
err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
|
||||
if err != nil {
|
||||
t.Errorf("Expected SendMail() to not fail. Got err: %s", err)
|
||||
}
|
||||
|
|
@ -439,12 +439,12 @@ func TestOtherError(t *testing.T) {
|
|||
_, _ = conn.Write([]byte("250 Ok yr rset now\r\n"))
|
||||
})
|
||||
|
||||
conn, err := m.Connect()
|
||||
err := m.Connect()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to connect: %s", err)
|
||||
}
|
||||
|
||||
err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
|
||||
err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
|
||||
// We expect there to be an error
|
||||
if err == nil {
|
||||
t.Errorf("Expected SendMail() to return an error, got nil")
|
||||
|
|
@ -488,12 +488,12 @@ func TestOtherError(t *testing.T) {
|
|||
|
||||
_, _ = conn.Write([]byte("nop\r\n"))
|
||||
})
|
||||
conn, err = m.Connect()
|
||||
err = m.Connect()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to connect: %s", err)
|
||||
}
|
||||
|
||||
err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
|
||||
err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
|
||||
// We expect there to be an error
|
||||
test.AssertError(t, err, "SendMail didn't fail as expected")
|
||||
test.AssertEquals(t, err.Error(), "999 1.1.1 This would probably be bad? (also, on sending RSET: short response: nop)")
|
||||
|
|
@ -511,11 +511,11 @@ func TestReconnectAfterRST(t *testing.T) {
|
|||
// With a mailer client that has a max attempt > `closedConns` we expect no
|
||||
// error. The message should be delivered after `closedConns` reconnect
|
||||
// attempts.
|
||||
conn, err := m.Connect()
|
||||
err := m.Connect()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to connect: %s", err)
|
||||
}
|
||||
err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
|
||||
err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
|
||||
if err != nil {
|
||||
t.Errorf("Expected SendMail() to not fail. Got err: %s", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ import (
|
|||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jmhodges/clock"
|
||||
|
|
@ -23,7 +22,6 @@ import (
|
|||
berrors "github.com/letsencrypt/boulder/errors"
|
||||
bgrpc "github.com/letsencrypt/boulder/grpc"
|
||||
"github.com/letsencrypt/boulder/identifier"
|
||||
"github.com/letsencrypt/boulder/mail"
|
||||
"github.com/letsencrypt/boulder/probs"
|
||||
pubpb "github.com/letsencrypt/boulder/publisher/proto"
|
||||
sapb "github.com/letsencrypt/boulder/sa/proto"
|
||||
|
|
@ -562,19 +560,9 @@ func (*PublisherClient) SubmitToSingleCTWithResult(_ context.Context, _ *pubpb.R
|
|||
|
||||
// Mailer is a mock
|
||||
type Mailer struct {
|
||||
sync.Mutex
|
||||
Messages []MailerMessage
|
||||
}
|
||||
|
||||
var _ mail.Mailer = &Mailer{}
|
||||
|
||||
// mockMailerConn is a mock that satisfies the mail.Conn interface
|
||||
type mockMailerConn struct {
|
||||
parent *Mailer
|
||||
}
|
||||
|
||||
var _ mail.Conn = &mockMailerConn{}
|
||||
|
||||
// MailerMessage holds the captured emails from SendMail()
|
||||
type MailerMessage struct {
|
||||
To string
|
||||
|
|
@ -584,17 +572,13 @@ type MailerMessage struct {
|
|||
|
||||
// Clear removes any previously recorded messages
|
||||
func (m *Mailer) Clear() {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
m.Messages = nil
|
||||
}
|
||||
|
||||
// SendMail is a mock
|
||||
func (m *mockMailerConn) SendMail(to []string, subject, msg string) error {
|
||||
m.parent.Lock()
|
||||
defer m.parent.Unlock()
|
||||
func (m *Mailer) SendMail(to []string, subject, msg string) error {
|
||||
for _, rcpt := range to {
|
||||
m.parent.Messages = append(m.parent.Messages, MailerMessage{
|
||||
m.Messages = append(m.Messages, MailerMessage{
|
||||
To: rcpt,
|
||||
Subject: subject,
|
||||
Body: msg,
|
||||
|
|
@ -604,13 +588,13 @@ func (m *mockMailerConn) SendMail(to []string, subject, msg string) error {
|
|||
}
|
||||
|
||||
// Close is a mock
|
||||
func (m *mockMailerConn) Close() error {
|
||||
func (m *Mailer) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Connect is a mock
|
||||
func (m *Mailer) Connect() (mail.Conn, error) {
|
||||
return &mockMailerConn{parent: m}, nil
|
||||
func (m *Mailer) Connect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SAWithFailedChallenges is a mocks.StorageAuthority that has
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
"nagCheckInterval": "24h",
|
||||
"emailTemplate": "test/example-expiration-template",
|
||||
"debugAddr": ":8008",
|
||||
"parallelSends": 10,
|
||||
"tls": {
|
||||
"caCertFile": "test/grpc-creds/minica.pem",
|
||||
"certFile": "test/grpc-creds/expiration-mailer.boulder/cert.pem",
|
||||
|
|
|
|||
Loading…
Reference in New Issue