From 7ef6913e7152529b8d275a1a126a5eceb3a953e4 Mon Sep 17 00:00:00 2001 From: Aaron Gable Date: Tue, 3 May 2022 13:18:40 -0700 Subject: [PATCH] Revert "Allow expiration mailer to work in parallel" (#6080) When deployed, the newly-parallel expiration-mailer encountered unexpected difficulties and dropped to apparently sending nearly zero emails despite not throwing any real errors. Reverting the parallelism change until we understand and can fix the root cause. This reverts two commits: - Allow expiration mailer to work in parallel (#6057) - Fix data race in expiration-mailer test mocks (#6072) It also modifies the revert to leave the new `ParallelSends` config key in place (albeit completely ignored), so that the binary containing this revert can be safely deployed regardless of config status. Part of #5682 --- cmd/bad-key-revoker/main.go | 6 +- cmd/expiration-mailer/main.go | 148 ++++++++------------ cmd/expiration-mailer/main_test.go | 45 +----- cmd/expiration-mailer/send_test.go | 4 +- cmd/notify-mailer/main.go | 6 +- mail/mailer.go | 177 +++++++++++------------- mail/mailer_test.go | 30 ++-- mocks/mocks.go | 26 +--- test/config-next/expiration-mailer.json | 1 - 9 files changed, 163 insertions(+), 280 deletions(-) diff --git a/cmd/bad-key-revoker/main.go b/cmd/bad-key-revoker/main.go index cde3af1e2..8ae11c24f 100644 --- a/cmd/bad-key-revoker/main.go +++ b/cmd/bad-key-revoker/main.go @@ -194,12 +194,12 @@ var maxSerials = 100 // sendMessage sends a single email to the provided address with the revoked // serials func (bkr *badKeyRevoker) sendMessage(addr string, serials []string) error { - conn, err := bkr.mailer.Connect() + err := bkr.mailer.Connect() if err != nil { return err } defer func() { - _ = conn.Close() + _ = bkr.mailer.Close() }() mutSerials := make([]string, len(serials)) copy(mutSerials, serials) @@ -213,7 +213,7 @@ func (bkr *badKeyRevoker) sendMessage(addr string, serials []string) error { if err != nil { return err } - err = conn.SendMail([]string{addr}, bkr.emailSubject, message.String()) + err = bkr.mailer.SendMail([]string{addr}, bkr.emailSubject, message.String()) if err != nil { return err } diff --git a/cmd/expiration-mailer/main.go b/cmd/expiration-mailer/main.go index 5debf63c6..407032ebe 100644 --- a/cmd/expiration-mailer/main.go +++ b/cmd/expiration-mailer/main.go @@ -15,7 +15,6 @@ import ( "os" "sort" "strings" - "sync" "text/template" "time" @@ -54,7 +53,6 @@ type mailer struct { emailTemplate *template.Template subjectTemplate *template.Template nagTimes []time.Duration - parallelSends uint limit int clk clock.Clock stats mailerStats @@ -68,7 +66,7 @@ type mailerStats struct { processingLatency prometheus.Histogram } -func (m *mailer) sendNags(conn bmail.Conn, contacts []string, certs []*x509.Certificate) error { +func (m *mailer) sendNags(contacts []string, certs []*x509.Certificate) error { if len(contacts) == 0 { return nil } @@ -163,7 +161,7 @@ func (m *mailer) sendNags(conn bmail.Conn, contacts []string, certs []*x509.Cert m.log.Infof("attempting send JSON=%s", string(logStr)) startSending := m.clk.Now() - err = conn.SendMail(emails, subjBuf.String(), msgBuf.String()) + err = m.mailer.SendMail(emails, subjBuf.String(), msgBuf.String()) if err != nil { m.log.Errf("failed send JSON=%s", string(logStr)) return err @@ -194,11 +192,6 @@ func (m *mailer) certIsRenewed(names []string, issued time.Time) (bool, error) { return present, err } -type work struct { - regID int64 - certs []core.Certificate -} - func (m *mailer) processCerts(ctx context.Context, allCerts []core.Certificate) { regIDToCerts := make(map[int64][]core.Certificate) @@ -208,104 +201,76 @@ func (m *mailer) processCerts(ctx context.Context, allCerts []core.Certificate) regIDToCerts[cert.RegistrationID] = cs } - var wg sync.WaitGroup - workChan := make(chan work) - - parallelSends := m.parallelSends - if parallelSends == 0 { - parallelSends = 1 + err := m.mailer.Connect() + if err != nil { + m.log.AuditErrf("Error connecting to send nag emails: %s", err) + return } + defer func() { + _ = m.mailer.Close() + }() - for senderNum := uint(0); senderNum < parallelSends; senderNum++ { - conn, err := m.mailer.Connect() - if err != nil { - m.log.AuditErrf("connecting parallel sender %d: %s", senderNum, err) - close(workChan) - return - } - - wg.Add(1) - go func(conn bmail.Conn, ch <-chan work) { - defer wg.Done() - for w := range ch { - err := m.sendToOneRegID(ctx, conn, w.regID, w.certs) - if err != nil { - m.log.AuditErr(err.Error()) - } - } - conn.Close() - }(conn, workChan) - - // For politeness' sake, don't open more than 1 new connection per - // second. - time.Sleep(time.Second) - } for regID, certs := range regIDToCerts { - workChan <- work{regID, certs} - } - close(workChan) - wg.Wait() -} - -func (m *mailer) sendToOneRegID(ctx context.Context, conn bmail.Conn, regID int64, certs []core.Certificate) error { - reg, err := m.rs.GetRegistration(ctx, &sapb.RegistrationID{Id: regID}) - if err != nil { - m.stats.errorCount.With(prometheus.Labels{"type": "GetRegistration"}).Inc() - return fmt.Errorf("fetching registration %d: %w", regID, err) - } - - if reg.Contact == nil { - return nil - } - - parsedCerts := []*x509.Certificate{} - for _, cert := range certs { - parsedCert, err := x509.ParseCertificate(cert.DER) + reg, err := m.rs.GetRegistration(ctx, &sapb.RegistrationID{Id: regID}) if err != nil { - m.stats.errorCount.With(prometheus.Labels{"type": "ParseCertificate"}).Inc() - // TODO(#1420): tell registration about this error - return fmt.Errorf("parsing certificate %s: %w", cert.Serial, err) + m.log.AuditErrf("Error fetching registration %d: %s", regID, err) + m.stats.errorCount.With(prometheus.Labels{"type": "GetRegistration"}).Inc() + continue } - renewed, err := m.certIsRenewed(parsedCert.DNSNames, parsedCert.NotBefore) - if err != nil { - return fmt.Errorf("expiration-mailer: error fetching renewal state: %w", err) - } else if renewed { - m.stats.renewalCount.With(prometheus.Labels{}).Inc() - err := m.updateCertStatus(cert.Serial) + parsedCerts := []*x509.Certificate{} + for _, cert := range certs { + parsedCert, err := x509.ParseCertificate(cert.DER) if err != nil { - m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc() - return fmt.Errorf("updating certificate status for %s: %w", cert.Serial, err) + // TODO(#1420): tell registration about this error + m.log.AuditErrf("Error parsing certificate %s: %s", cert.Serial, err) + m.stats.errorCount.With(prometheus.Labels{"type": "ParseCertificate"}).Inc() + continue } + + renewed, err := m.certIsRenewed(parsedCert.DNSNames, parsedCert.NotBefore) + if err != nil { + m.log.AuditErrf("expiration-mailer: error fetching renewal state: %v", err) + // assume not renewed + } else if renewed { + m.log.Debugf("Cert %s is already renewed", cert.Serial) + m.stats.renewalCount.With(prometheus.Labels{}).Inc() + err := m.updateCertStatus(cert.Serial) + if err != nil { + m.log.AuditErrf("Error updating certificate status for %s: %s", cert.Serial, err) + m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc() + } + continue + } + + parsedCerts = append(parsedCerts, parsedCert) + } + + if len(parsedCerts) == 0 { + // all certificates are renewed continue } - parsedCerts = append(parsedCerts, parsedCert) - } + if reg.Contact == nil { + continue + } - if len(parsedCerts) == 0 { - // all certificates are renewed - return nil - } - - err = m.sendNags(conn, reg.Contact, parsedCerts) - if err != nil { - m.stats.errorCount.With(prometheus.Labels{"type": "SendNags"}).Inc() - return fmt.Errorf("sending nag emails: %w", err) - } - for _, cert := range parsedCerts { - serial := core.SerialToString(cert.SerialNumber) - err = m.updateCertStatus(serial) + err = m.sendNags(reg.Contact, parsedCerts) if err != nil { - // Don't return immediately; we'd like to at least try and update the status for - // all certificates, even if one of them experienced an error (which might have - // been intermittent) - m.log.AuditErrf("updating certificate status for %s: %s", serial, err) - m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc() + m.stats.errorCount.With(prometheus.Labels{"type": "SendNags"}).Inc() + m.log.AuditErrf("Error sending nag emails: %s", err) continue } + for _, cert := range parsedCerts { + serial := core.SerialToString(cert.SerialNumber) + err = m.updateCertStatus(serial) + if err != nil { + m.log.AuditErrf("Error updating certificate status for %s: %s", serial, err) + m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc() + continue + } + } } - return nil } func (m *mailer) findExpiringCertificates(ctx context.Context) error { @@ -624,7 +589,6 @@ func main() { emailTemplate: tmpl, nagTimes: nags, limit: c.Mailer.CertLimit, - parallelSends: c.Mailer.ParallelSends, clk: clk, stats: initStats(scope), } diff --git a/cmd/expiration-mailer/main_test.go b/cmd/expiration-mailer/main_test.go index ddf1b037c..77dff26f6 100644 --- a/cmd/expiration-mailer/main_test.go +++ b/cmd/expiration-mailer/main_test.go @@ -7,7 +7,6 @@ import ( "crypto/x509" "crypto/x509/pkix" "encoding/base64" - "errors" "fmt" "math/big" "net" @@ -22,7 +21,6 @@ import ( "github.com/letsencrypt/boulder/db" berrors "github.com/letsencrypt/boulder/errors" blog "github.com/letsencrypt/boulder/log" - bmail "github.com/letsencrypt/boulder/mail" "github.com/letsencrypt/boulder/metrics" "github.com/letsencrypt/boulder/mocks" "github.com/letsencrypt/boulder/sa" @@ -130,9 +128,7 @@ func TestSendNags(t *testing.T) { DNSNames: []string{"example.com"}, } - conn, err := m.mailer.Connect() - test.AssertNotError(t, err, "connecting SMTP") - err = m.sendNags(conn, []string{emailA}, []*x509.Certificate{cert}) + err := m.sendNags([]string{emailA}, []*x509.Certificate{cert}) test.AssertNotError(t, err, "Failed to send warning messages") test.AssertEquals(t, len(mc.Messages), 1) test.AssertEquals(t, mocks.MailerMessage{ @@ -142,9 +138,7 @@ func TestSendNags(t *testing.T) { }, mc.Messages[0]) mc.Clear() - conn, err = m.mailer.Connect() - test.AssertNotError(t, err, "connecting SMTP") - err = m.sendNags(conn, []string{emailA, emailB}, []*x509.Certificate{cert}) + err = m.sendNags([]string{emailA, emailB}, []*x509.Certificate{cert}) test.AssertNotError(t, err, "Failed to send warning messages") test.AssertEquals(t, len(mc.Messages), 2) test.AssertEquals(t, mocks.MailerMessage{ @@ -159,9 +153,7 @@ func TestSendNags(t *testing.T) { }, mc.Messages[1]) mc.Clear() - conn, err = m.mailer.Connect() - test.AssertNotError(t, err, "connecting SMTP") - err = m.sendNags(conn, []string{}, []*x509.Certificate{cert}) + err = m.sendNags([]string{}, []*x509.Certificate{cert}) test.AssertNotError(t, err, "Not an error to pass no email contacts") test.AssertEquals(t, len(mc.Messages), 0) @@ -225,37 +217,6 @@ func TestProcessCerts(t *testing.T) { } } -func TestProcessCertsParallel(t *testing.T) { - testCtx := setup(t, []time.Duration{time.Hour * 24 * 7}) - - testCtx.m.parallelSends = 2 - certs := addExpiringCerts(t, testCtx) - log.Clear() - testCtx.m.processCerts(context.Background(), certs) - // Test that the lastExpirationNagSent was updated for the certificate - // corresponding to serial4, which is set up as "already renewed" by - // addExpiringCerts. - if len(log.GetAllMatching("DEBUG: SQL: UPDATE certificateStatus .*2006-01-02 15:04:05.999999999.*\"000000000000000000000000000000001339\"")) != 1 { - t.Errorf("Expected an update to certificateStatus, got these log lines:\n%s", - strings.Join(log.GetAllMatching(".*"), "\n")) - } -} - -type erroringMailClient struct{} - -func (e erroringMailClient) Connect() (bmail.Conn, error) { - return nil, errors.New("whoopsie-doo") -} - -func TestProcessCertsConnectError(t *testing.T) { - testCtx := setup(t, []time.Duration{time.Hour * 24 * 7}) - - testCtx.m.mailer = erroringMailClient{} - certs := addExpiringCerts(t, testCtx) - // Checking that this terminates rather than deadlocks - testCtx.m.processCerts(context.Background(), certs) -} - func TestFindExpiringCertificates(t *testing.T) { testCtx := setup(t, []time.Duration{time.Hour * 24, time.Hour * 24 * 4, time.Hour * 24 * 7}) diff --git a/cmd/expiration-mailer/send_test.go b/cmd/expiration-mailer/send_test.go index 9ff4f9d03..80c5a7e3f 100644 --- a/cmd/expiration-mailer/send_test.go +++ b/cmd/expiration-mailer/send_test.go @@ -33,9 +33,7 @@ func TestSendEarliestCertInfo(t *testing.T) { serial2, ) - conn, err := ctx.m.mailer.Connect() - test.AssertNotError(t, err, "connecting SMTP") - err = ctx.m.sendNags(conn, []string{email1, email2}, []*x509.Certificate{rawCertA, rawCertB}) + err := ctx.m.sendNags([]string{email1, email2}, []*x509.Certificate{rawCertA, rawCertB}) if err != nil { t.Fatal(err) } diff --git a/cmd/notify-mailer/main.go b/cmd/notify-mailer/main.go index 1f8493ea1..8bc97bd82 100644 --- a/cmd/notify-mailer/main.go +++ b/cmd/notify-mailer/main.go @@ -154,12 +154,12 @@ func (m *mailer) run() error { m.log.Infof("Address %q was associated with the most recipients (%d)", mostRecipients, mostRecipientsLen) - conn, err := m.mailer.Connect() + err = m.mailer.Connect() if err != nil { return err } - defer func() { _ = conn.Close() }() + defer func() { _ = m.mailer.Close() }() startTime := m.clk.Now() sortedAddresses := sortAddresses(addressToRecipient) @@ -186,7 +186,7 @@ func (m *mailer) run() error { continue } - err = conn.SendMail([]string{address}, m.subject, messageBody) + err = m.mailer.SendMail([]string{address}, m.subject, messageBody) if err != nil { var badAddrErr bmail.BadAddressSMTPError if errors.As(err, &badAddrErr) { diff --git a/mail/mailer.go b/mail/mailer.go index 4249108eb..b20de9496 100644 --- a/mail/mailer.go +++ b/mail/mailer.go @@ -43,37 +43,20 @@ func (s realSource) generate() *big.Int { return randInt } -// Mailer is an interface that allows creating Conns. Implementations must -// be safe for concurrent use. +// Mailer provides the interface for a mailer type Mailer interface { - Connect() (Conn, error) -} - -// Conn is an interface that allows sending mail. When you are done with a -// Conn, call Close(). Implementations are not required to be safe for -// concurrent use. -type Conn interface { SendMail([]string, string, string) error + Connect() error Close() error } -// connImpl represents a single connection to a mail server. It is not safe -// for concurrent use. -type connImpl struct { - config - client smtpClient -} - -// mailerImpl defines a mail transfer agent to use for sending mail. It is -// safe for concurrent us. -type mailerImpl struct { - config -} - -type config struct { +// MailerImpl defines a mail transfer agent to use for sending mail. It is not +// safe for concurrent access. +type MailerImpl struct { log blog.Logger dialer dialer from mail.Address + client smtpClient clk clock.Clock csprgSource idGenerator reconnectBase time.Duration @@ -143,7 +126,7 @@ func New( logger blog.Logger, stats prometheus.Registerer, reconnectBase time.Duration, - reconnectMax time.Duration) *mailerImpl { + reconnectMax time.Duration) *MailerImpl { sendMailAttempts := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "send_mail_attempts", @@ -151,46 +134,42 @@ func New( }, []string{"result", "error"}) stats.MustRegister(sendMailAttempts) - return &mailerImpl{ - config: config{ - dialer: &dialerImpl{ - username: username, - password: password, - server: server, - port: port, - rootCAs: rootCAs, - }, - log: logger, - from: from, - clk: clock.New(), - csprgSource: realSource{}, - reconnectBase: reconnectBase, - reconnectMax: reconnectMax, - sendMailAttempts: sendMailAttempts, + return &MailerImpl{ + dialer: &dialerImpl{ + username: username, + password: password, + server: server, + port: port, + rootCAs: rootCAs, }, + log: logger, + from: from, + clk: clock.New(), + csprgSource: realSource{}, + reconnectBase: reconnectBase, + reconnectMax: reconnectMax, + sendMailAttempts: sendMailAttempts, } } // New constructs a Mailer suitable for doing a dry run. It simply logs each // command that would have been run, at debug level. -func NewDryRun(from mail.Address, logger blog.Logger) *mailerImpl { - return &mailerImpl{ - config: config{ - dialer: dryRunClient{logger}, - from: from, - clk: clock.New(), - csprgSource: realSource{}, - sendMailAttempts: prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "send_mail_attempts", - Help: "A counter of send mail attempts labelled by result", - }, []string{"result", "error"}), - }, +func NewDryRun(from mail.Address, logger blog.Logger) *MailerImpl { + return &MailerImpl{ + dialer: dryRunClient{logger}, + from: from, + clk: clock.New(), + csprgSource: realSource{}, + sendMailAttempts: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "send_mail_attempts", + Help: "A counter of send mail attempts labelled by result", + }, []string{"result", "error"}), } } -func (c config) generateMessage(to []string, subject, body string) ([]byte, error) { - mid := c.csprgSource.generate() - now := c.clk.Now().UTC() +func (m *MailerImpl) generateMessage(to []string, subject, body string) ([]byte, error) { + mid := m.csprgSource.generate() + now := m.clk.Now().UTC() addrs := []string{} for _, a := range to { if !core.IsASCII(a) { @@ -200,10 +179,10 @@ func (c config) generateMessage(to []string, subject, body string) ([]byte, erro } headers := []string{ fmt.Sprintf("To: %s", strings.Join(addrs, ", ")), - fmt.Sprintf("From: %s", c.from.String()), + fmt.Sprintf("From: %s", m.from.String()), fmt.Sprintf("Subject: %s", subject), fmt.Sprintf("Date: %s", now.Format(time.RFC822)), - fmt.Sprintf("Message-Id: <%s.%s.%s>", now.Format("20060102T150405"), mid.String(), c.from.Address), + fmt.Sprintf("Message-Id: <%s.%s.%s>", now.Format("20060102T150405"), mid.String(), m.from.Address), "MIME-Version: 1.0", "Content-Type: text/plain; charset=UTF-8", "Content-Transfer-Encoding: quoted-printable", @@ -229,31 +208,31 @@ func (c config) generateMessage(to []string, subject, body string) ([]byte, erro )), nil } -func (c *connImpl) reconnect() { +func (m *MailerImpl) reconnect() { for i := 0; ; i++ { - sleepDuration := core.RetryBackoff(i, c.reconnectBase, c.reconnectMax, 2) - c.log.Infof("sleeping for %s before reconnecting mailer", sleepDuration) - c.clk.Sleep(sleepDuration) - c.log.Info("attempting to reconnect mailer") - client, err := c.dialer.Dial() + sleepDuration := core.RetryBackoff(i, m.reconnectBase, m.reconnectMax, 2) + m.log.Infof("sleeping for %s before reconnecting mailer", sleepDuration) + m.clk.Sleep(sleepDuration) + m.log.Info("attempting to reconnect mailer") + err := m.Connect() if err != nil { - c.log.Warningf("reconnect error: %s", err) + m.log.Warningf("reconnect error: %s", err) continue } - c.client = client break } - c.log.Info("reconnected successfully") + m.log.Info("reconnected successfully") } // Connect opens a connection to the specified mail server. It must be called // before SendMail. -func (m *mailerImpl) Connect() (Conn, error) { +func (m *MailerImpl) Connect() error { client, err := m.dialer.Dial() if err != nil { - return nil, err + return err } - return &connImpl{m.config, client}, nil + m.client = client + return nil } type dialerImpl struct { @@ -286,43 +265,43 @@ func (di *dialerImpl) Dial() (smtpClient, error) { // argument as an error. If the reset command also errors, it combines both // errors and returns them. Without this we would get `nested MAIL command`. // https://github.com/letsencrypt/boulder/issues/3191 -func (c *connImpl) resetAndError(err error) error { +func (m *MailerImpl) resetAndError(err error) error { if err == io.EOF { return err } - if err2 := c.client.Reset(); err2 != nil { + if err2 := m.client.Reset(); err2 != nil { return fmt.Errorf("%s (also, on sending RSET: %s)", err, err2) } return err } -func (c *connImpl) sendOne(to []string, subject, msg string) error { - if c.client == nil { +func (m *MailerImpl) sendOne(to []string, subject, msg string) error { + if m.client == nil { return errors.New("call Connect before SendMail") } - body, err := c.generateMessage(to, subject, msg) + body, err := m.generateMessage(to, subject, msg) if err != nil { return err } - if err = c.client.Mail(c.from.String()); err != nil { + if err = m.client.Mail(m.from.String()); err != nil { return err } for _, t := range to { - if err = c.client.Rcpt(t); err != nil { - return c.resetAndError(err) + if err = m.client.Rcpt(t); err != nil { + return m.resetAndError(err) } } - w, err := c.client.Data() + w, err := m.client.Data() if err != nil { - return c.resetAndError(err) + return m.resetAndError(err) } _, err = w.Write(body) if err != nil { - return c.resetAndError(err) + return m.resetAndError(err) } err = w.Close() if err != nil { - return c.resetAndError(err) + return m.resetAndError(err) } return nil } @@ -357,34 +336,34 @@ var badAddressErrorCodes = map[int]bool{ // SendMail sends an email to the provided list of recipients. The email body // is simple text. -func (c *connImpl) SendMail(to []string, subject, msg string) error { +func (m *MailerImpl) SendMail(to []string, subject, msg string) error { var protoErr *textproto.Error for { - err := c.sendOne(to, subject, msg) + err := m.sendOne(to, subject, msg) if err == nil { // If the error is nil, we sent the mail without issue. nice! break } else if err == io.EOF { - c.sendMailAttempts.WithLabelValues("failure", "EOF").Inc() + m.sendMailAttempts.WithLabelValues("failure", "EOF").Inc() // If the error is an EOF, we should try to reconnect on a backoff // schedule, sleeping between attempts. - c.reconnect() + m.reconnect() // After reconnecting, loop around and try `sendOne` again. continue } else if errors.Is(err, syscall.ECONNRESET) { - c.sendMailAttempts.WithLabelValues("failure", "TCP RST").Inc() + m.sendMailAttempts.WithLabelValues("failure", "TCP RST").Inc() // If the error is `syscall.ECONNRESET`, we should try to reconnect on a backoff // schedule, sleeping between attempts. - c.reconnect() + m.reconnect() // After reconnecting, loop around and try `sendOne` again. continue } else if errors.Is(err, syscall.EPIPE) { // EPIPE also seems to be a common way to signal TCP RST. - c.sendMailAttempts.WithLabelValues("failure", "EPIPE").Inc() - c.reconnect() + m.sendMailAttempts.WithLabelValues("failure", "EPIPE").Inc() + m.reconnect() continue } else if errors.As(err, &protoErr) && protoErr.Code == 421 { - c.sendMailAttempts.WithLabelValues("failure", "SMTP 421").Inc() + m.sendMailAttempts.WithLabelValues("failure", "SMTP 421").Inc() /* * If the error is an instance of `textproto.Error` with a SMTP error code, * and that error code is 421 then treat this as a reconnect-able event. @@ -400,30 +379,28 @@ func (c *connImpl) SendMail(to []string, subject, msg string) error { * * [0] - https://github.com/letsencrypt/boulder/issues/2249 */ - c.reconnect() + m.reconnect() // After reconnecting, loop around and try `sendOne` again. continue } else if errors.As(err, &protoErr) && badAddressErrorCodes[protoErr.Code] { - c.sendMailAttempts.WithLabelValues("failure", fmt.Sprintf("SMTP %d", protoErr.Code)).Inc() + m.sendMailAttempts.WithLabelValues("failure", fmt.Sprintf("SMTP %d", protoErr.Code)).Inc() return BadAddressSMTPError{fmt.Sprintf("%d: %s", protoErr.Code, protoErr.Msg)} } else { // If it wasn't an EOF error or a recoverable SMTP error it is unexpected and we // return from SendMail() with the error - c.sendMailAttempts.WithLabelValues("failure", "unexpected").Inc() + m.sendMailAttempts.WithLabelValues("failure", "unexpected").Inc() return err } } - c.sendMailAttempts.WithLabelValues("success", "").Inc() + m.sendMailAttempts.WithLabelValues("success", "").Inc() return nil } // Close closes the connection. -func (c *connImpl) Close() error { - err := c.client.Close() - if err != nil { - return err +func (m *MailerImpl) Close() error { + if m.client == nil { + return errors.New("call Connect before Close") } - c.client = nil - return nil + return m.client.Close() } diff --git a/mail/mailer_test.go b/mail/mailer_test.go index d41582fea..a192e40db 100644 --- a/mail/mailer_test.go +++ b/mail/mailer_test.go @@ -264,7 +264,7 @@ func rstHandler(rstFirst int) connHandler { } } -func setup(t *testing.T) (*mailerImpl, *net.TCPListener, func()) { +func setup(t *testing.T) (*MailerImpl, *net.TCPListener, func()) { fromAddress, _ := mail.ParseAddress("you-are-a-winner@example.com") log := blog.UseMock() @@ -322,11 +322,11 @@ func TestConnect(t *testing.T) { defer cleanUp() go listenForever(l, t, normalHandler) - conn, err := m.Connect() + err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = conn.Close() + err = m.Close() if err != nil { t.Errorf("Failed to clean up: %s", err) } @@ -344,11 +344,11 @@ func TestReconnectSuccess(t *testing.T) { // With a mailer client that has a max attempt > `closedConns` we expect no // error. The message should be delivered after `closedConns` reconnect // attempts. - conn, err := m.Connect() + err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") if err != nil { t.Errorf("Expected SendMail() to not fail. Got err: %s", err) } @@ -361,12 +361,12 @@ func TestBadEmailError(t *testing.T) { go listenForever(l, t, badEmailHandler(messages)) - conn, err := m.Connect() + err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") // We expect there to be an error if err == nil { t.Errorf("Expected SendMail() to return an BadAddressSMTPError, got nil") @@ -393,11 +393,11 @@ func TestReconnectSMTP421(t *testing.T) { // With a mailer client that has a max attempt > `closedConns` we expect no // error. The message should be delivered after `closedConns` reconnect // attempts. - conn, err := m.Connect() + err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") if err != nil { t.Errorf("Expected SendMail() to not fail. Got err: %s", err) } @@ -439,12 +439,12 @@ func TestOtherError(t *testing.T) { _, _ = conn.Write([]byte("250 Ok yr rset now\r\n")) }) - conn, err := m.Connect() + err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") // We expect there to be an error if err == nil { t.Errorf("Expected SendMail() to return an error, got nil") @@ -488,12 +488,12 @@ func TestOtherError(t *testing.T) { _, _ = conn.Write([]byte("nop\r\n")) }) - conn, err = m.Connect() + err = m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") // We expect there to be an error test.AssertError(t, err, "SendMail didn't fail as expected") test.AssertEquals(t, err.Error(), "999 1.1.1 This would probably be bad? (also, on sending RSET: short response: nop)") @@ -511,11 +511,11 @@ func TestReconnectAfterRST(t *testing.T) { // With a mailer client that has a max attempt > `closedConns` we expect no // error. The message should be delivered after `closedConns` reconnect // attempts. - conn, err := m.Connect() + err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") if err != nil { t.Errorf("Expected SendMail() to not fail. Got err: %s", err) } diff --git a/mocks/mocks.go b/mocks/mocks.go index fdff570ff..9a81f0488 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -10,7 +10,6 @@ import ( "io/ioutil" "math/rand" "net" - "sync" "time" "github.com/jmhodges/clock" @@ -23,7 +22,6 @@ import ( berrors "github.com/letsencrypt/boulder/errors" bgrpc "github.com/letsencrypt/boulder/grpc" "github.com/letsencrypt/boulder/identifier" - "github.com/letsencrypt/boulder/mail" "github.com/letsencrypt/boulder/probs" pubpb "github.com/letsencrypt/boulder/publisher/proto" sapb "github.com/letsencrypt/boulder/sa/proto" @@ -562,19 +560,9 @@ func (*PublisherClient) SubmitToSingleCTWithResult(_ context.Context, _ *pubpb.R // Mailer is a mock type Mailer struct { - sync.Mutex Messages []MailerMessage } -var _ mail.Mailer = &Mailer{} - -// mockMailerConn is a mock that satisfies the mail.Conn interface -type mockMailerConn struct { - parent *Mailer -} - -var _ mail.Conn = &mockMailerConn{} - // MailerMessage holds the captured emails from SendMail() type MailerMessage struct { To string @@ -584,17 +572,13 @@ type MailerMessage struct { // Clear removes any previously recorded messages func (m *Mailer) Clear() { - m.Lock() - defer m.Unlock() m.Messages = nil } // SendMail is a mock -func (m *mockMailerConn) SendMail(to []string, subject, msg string) error { - m.parent.Lock() - defer m.parent.Unlock() +func (m *Mailer) SendMail(to []string, subject, msg string) error { for _, rcpt := range to { - m.parent.Messages = append(m.parent.Messages, MailerMessage{ + m.Messages = append(m.Messages, MailerMessage{ To: rcpt, Subject: subject, Body: msg, @@ -604,13 +588,13 @@ func (m *mockMailerConn) SendMail(to []string, subject, msg string) error { } // Close is a mock -func (m *mockMailerConn) Close() error { +func (m *Mailer) Close() error { return nil } // Connect is a mock -func (m *Mailer) Connect() (mail.Conn, error) { - return &mockMailerConn{parent: m}, nil +func (m *Mailer) Connect() error { + return nil } // SAWithFailedChallenges is a mocks.StorageAuthority that has diff --git a/test/config-next/expiration-mailer.json b/test/config-next/expiration-mailer.json index a92147f87..0d12292ae 100644 --- a/test/config-next/expiration-mailer.json +++ b/test/config-next/expiration-mailer.json @@ -14,7 +14,6 @@ "nagCheckInterval": "24h", "emailTemplate": "test/example-expiration-template", "debugAddr": ":8008", - "parallelSends": 10, "tls": { "caCertFile": "test/grpc-creds/minica.pem", "certFile": "test/grpc-creds/expiration-mailer.boulder/cert.pem",