Allow `mailer` to reconnect to server. (#2101)

The `MailerImpl` gains a few new fields (`retryBase`, &  `retryMax`). These are used with `core.RetryBackoff` in `reconnect()` to implement exponential backoff in a reconnect attempt loop. Both `expiration-mailer` and `notify-mailer` are modified to add CLI args for these 2 flags and to wire them into the `MailerImpl` via its `New()` constructor.

In `MailerImpl`'s `SendMail()` function it now detects when `sendOne` returns an `io.EOF` error indicating that the server closed the connection unexpectedly. When this case occurs `reconnect()` is invoked. If the reconnect succeeds then we invoke `sendOne` again to try and complete the message sending operation that was interrupted by the disconnect.

For integration testing purposes I modified the `mail-test-srv` to support a `-closeChance` parameter between 0 and 100. This controls what % of `MAIL` commands will result in the server immediately closing the client connection before further processing. This allows us to simulate a flaky mailserver. `test/startservers.py` is modified to start the `mail-test-srv` with a 35% close chance to thoroughly test the reconnection logic during the existing `expiration-mailer` integration tests. I took this as a chance to do some slight clean-up of the `mail-test-srv` code (mostly removing global state).

For unit testing purposes I modified the mailer `TestConnect` test to abstract out a server that can operate similar to `mail-test-serv` (e.g. can close connections artificially).

This is testing a server that **closes** a connection, and not a server that **goes away/goes down**. E.g. the `core.RetryBackoff` sleeps themselves are not being tested. The client is disconnected and attempts a reconnection which always succeeds on the first try. To test a "gone away" server would require a more substantial rewrite of the unit tests and the `mail-test-srv`/integration tests. I think this matches the experience we have with MailChimp/Mandril closing long lived connections.
This commit is contained in:
Daniel McCarney 2016-08-15 17:14:49 -04:00 committed by Jacob Hoffman-Andrews
parent c5d6166f08
commit a584f8de46
8 changed files with 283 additions and 104 deletions

View File

@ -328,7 +328,11 @@ type config struct {
func main() {
configFile := flag.String("config", "", "File path to the configuration file for this service")
certLimit := flag.Int("cert_limit", 0, "Count of certificates to process per expiration period")
reconnBase := flag.Duration("reconnectBase", 1*time.Second, "Base sleep duration between reconnect attempts")
reconnMax := flag.Duration("reconnectMax", 5*60*time.Second, "Max sleep duration between reconnect attempts after exponential backoff")
flag.Parse()
if *configFile == "" {
flag.Usage()
os.Exit(1)
@ -381,7 +385,10 @@ func main() {
c.Mailer.Username,
smtpPassword,
*fromAddress,
stats)
logger,
stats,
*reconnBase,
*reconnMax)
err = mailClient.Connect()
cmd.FailOnError(err, "Couldn't connect to mail server.")
defer func() {

View File

@ -268,6 +268,8 @@ func main() {
sleep := flag.Duration("sleep", 60*time.Second, "How long to sleep between emails.")
start := flag.Int("start", 0, "Line of input file to start from.")
end := flag.Int("end", 99999999, "Line of input file to end before.")
reconnBase := flag.Duration("reconnectBase", 1*time.Second, "Base sleep duration between reconnect attempts")
reconnMax := flag.Duration("reconnectMax", 5*60*time.Second, "Max sleep duration between reconnect attempts after exponential backoff")
type config struct {
NotifyMailer struct {
cmd.DBConfig
@ -333,7 +335,10 @@ func main() {
cfg.NotifyMailer.Username,
smtpPassword,
*address,
stats)
log,
stats,
*reconnBase,
*reconnMax)
}
err = mailClient.Connect()
cmd.FailOnError(err, fmt.Sprintf("Connecting to %s:%s",

View File

@ -51,12 +51,15 @@ type Mailer interface {
// MailerImpl defines a mail transfer agent to use for sending mail. It is not
// safe for concurrent access.
type MailerImpl struct {
dialer dialer
from mail.Address
client smtpClient
clk clock.Clock
csprgSource idGenerator
stats *metrics.StatsdScope
log blog.Logger
dialer dialer
from mail.Address
client smtpClient
clk clock.Clock
csprgSource idGenerator
stats *metrics.StatsdScope
reconnectBase time.Duration
reconnectMax time.Duration
}
type dialer interface {
@ -103,7 +106,16 @@ func (d dryRunClient) Write(p []byte) (n int, err error) {
// New constructs a Mailer to represent an account on a particular mail
// transfer agent.
func New(server, port, username, password string, from mail.Address, stats statsd.Statter) *MailerImpl {
func New(
server,
port,
username,
password string,
from mail.Address,
logger blog.Logger,
stats statsd.Statter,
reconnectBase time.Duration,
reconnectMax time.Duration) *MailerImpl {
return &MailerImpl{
dialer: &dialerImpl{
username: username,
@ -111,10 +123,13 @@ func New(server, port, username, password string, from mail.Address, stats stats
server: server,
port: port,
},
from: from,
clk: clock.Default(),
csprgSource: realSource{},
stats: metrics.NewStatsdScope(stats, "Mailer"),
log: logger,
from: from,
clk: clock.Default(),
csprgSource: realSource{},
stats: metrics.NewStatsdScope(stats, "Mailer"),
reconnectBase: reconnectBase,
reconnectMax: reconnectMax,
}
}
@ -173,6 +188,22 @@ func (m *MailerImpl) generateMessage(to []string, subject, body string) ([]byte,
)), nil
}
func (m *MailerImpl) reconnect() {
for i := 0; ; i++ {
sleepDuration := core.RetryBackoff(i, m.reconnectBase, m.reconnectMax, 2)
m.log.Info(fmt.Sprintf("sleeping for %s before reconnecting mailer", sleepDuration))
m.clk.Sleep(sleepDuration)
m.log.Info("attempting to reconnect mailer")
err := m.Connect()
if err != nil {
m.log.Warning(fmt.Sprintf("reconnect error: %s", err))
continue
}
break
}
m.log.Info("reconnected successfully")
}
// Connect opens a connection to the specified mail server. It must be called
// before SendMail.
func (m *MailerImpl) Connect() error {
@ -249,10 +280,25 @@ func (m *MailerImpl) sendOne(to []string, subject, msg string) error {
func (m *MailerImpl) SendMail(to []string, subject, msg string) error {
m.stats.Inc("SendMail.Attempts", 1)
err := m.sendOne(to, subject, msg)
if err != nil {
m.stats.Inc("SendMail.Errors", 1)
return err
for {
err := m.sendOne(to, subject, msg)
if err == nil {
// If the error is nil, we sent the mail without issue. nice!
break
} else if err == io.EOF {
// If the error is an EOF, we should try to reconnect on a backoff
// schedule, sleeping between attempts.
m.stats.Inc("SendMail.Errors.EOF", 1)
m.reconnect()
// After reconnecting, loop around and try `sendOne` again.
m.stats.Inc("SendMail.Reconnects", 1)
continue
} else if err != nil {
// If it wasn't an EOF error it is unexpected and we return from
// SendMail() with an error
m.stats.Inc("SendMail.Errors", 1)
return err
}
}
m.stats.Inc("SendMail.Successes", 1)

View File

@ -9,10 +9,12 @@ import (
"net/mail"
"strings"
"testing"
"time"
"github.com/cactus/go-statsd-client/statsd"
"github.com/jmhodges/clock"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/test"
)
@ -26,7 +28,8 @@ func TestGenerateMessage(t *testing.T) {
fc := clock.NewFake()
stats, _ := statsd.NewNoopClient(nil)
fromAddress, _ := mail.ParseAddress("happy sender <send@email.com>")
m := New("", "", "", "", *fromAddress, stats)
log := blog.UseMock()
m := New("", "", "", "", *fromAddress, log, stats, 0, 0)
m.clk = fc
m.csprgSource = fakeSource{}
messageBytes, err := m.generateMessage([]string{"recv@email.com"}, "test subject", "this is the body\n")
@ -48,9 +51,10 @@ func TestGenerateMessage(t *testing.T) {
}
func TestFailNonASCIIAddress(t *testing.T) {
log := blog.UseMock()
stats, _ := statsd.NewNoopClient(nil)
fromAddress, _ := mail.ParseAddress("send@email.com")
m := New("", "", "", "", *fromAddress, stats)
m := New("", "", "", "", *fromAddress, log, stats, 0, 0)
_, err := m.generateMessage([]string{"遗憾@email.com"}, "test subject", "this is the body\n")
test.AssertError(t, err, "Allowed a non-ASCII to address incorrectly")
}
@ -58,7 +62,7 @@ func TestFailNonASCIIAddress(t *testing.T) {
func expect(t *testing.T, buf *bufio.Reader, expected string) error {
line, _, err := buf.ReadLine()
if err != nil {
t.Errorf("readline: %s\n", err)
t.Errorf("readline: %s expected: %s\n", err, expected)
return err
}
if string(line) != expected {
@ -68,61 +72,128 @@ func expect(t *testing.T, buf *bufio.Reader, expected string) error {
return nil
}
func TestConnect(t *testing.T) {
port := "16632"
l, err := net.Listen("tcp", ":"+port)
if err != nil {
t.Errorf("listen: %s", err)
}
go func() {
defer func() {
err := l.Close()
if err != nil {
t.Errorf("listen.Close: %s", err)
}
}()
for {
conn, err := l.Accept()
if err != nil {
return
}
go func() {
defer func() {
err := conn.Close()
if err != nil {
t.Errorf("conn.Close: %s", err)
}
}()
buf := bufio.NewReader(conn)
// we can ignore write errors because any
// failures will be caught on the connecting
// side
_, _ = conn.Write([]byte("220 smtp.example.com ESMTP\n"))
if err := expect(t, buf, "EHLO localhost"); err != nil {
return
}
type connHandler func(int, *testing.T, net.Conn)
_, _ = conn.Write([]byte("250-PIPELINING\n"))
_, _ = conn.Write([]byte("250-AUTH PLAIN LOGIN\n"))
_, _ = conn.Write([]byte("250 8BITMIME\n"))
// Base64 encoding of "user@example.com\0paswd"
if err := expect(t, buf, "AUTH PLAIN AHVzZXJAZXhhbXBsZS5jb20AcGFzd2Q="); err != nil {
return
}
_, _ = conn.Write([]byte("235 2.7.0 Authentication successful\n"))
}()
func listenForever(l net.Listener, t *testing.T, handler connHandler) {
connID := 0
for {
conn, err := l.Accept()
if err != nil {
return
}
connID++
go handler(connID, t, conn)
}
}
func authenticateClient(t *testing.T, conn net.Conn) {
buf := bufio.NewReader(conn)
// we can ignore write errors because any
// failures will be caught on the connecting
// side
_, _ = conn.Write([]byte("220 smtp.example.com ESMTP\n"))
if err := expect(t, buf, "EHLO localhost"); err != nil {
return
}
_, _ = conn.Write([]byte("250-PIPELINING\n"))
_, _ = conn.Write([]byte("250-AUTH PLAIN LOGIN\n"))
_, _ = conn.Write([]byte("250 8BITMIME\n"))
// Base64 encoding of "user@example.com\0paswd"
if err := expect(t, buf, "AUTH PLAIN AHVzZXJAZXhhbXBsZS5jb20AcGFzd2Q="); err != nil {
return
}
_, _ = conn.Write([]byte("235 2.7.0 Authentication successful\n"))
}
// The normal handler authenticates the client and then disconnects without
// further command processing. It is sufficient for TestConnect()
func normalHandler(connID int, t *testing.T, conn net.Conn) {
defer func() {
err := conn.Close()
if err != nil {
t.Errorf("conn.Close: %s", err)
}
}()
authenticateClient(t, conn)
}
// The disconnectHandler authenticates the client like the normalHandler but
// additionally processes an email flow (e.g. MAIL, RCPT and DATA commands).
// When the `connID` is <= `closeFirst` the connection is closed immediately
// after the MAIL command is received and prior to issuing a 250 response. In
// this way the first `closeFirst` connections will not complete normally and
// can be tested for reconnection logic.
func disconnectHandler(closeFirst int) connHandler {
return func(connID int, t *testing.T, conn net.Conn) {
defer func() {
err := conn.Close()
if err != nil {
t.Errorf("conn.Close: %s", err)
}
}()
authenticateClient(t, conn)
buf := bufio.NewReader(conn)
if err := expect(t, buf, "MAIL FROM:<<you-are-a-winner@example.com>> BODY=8BITMIME"); err != nil {
return
}
if connID <= closeFirst {
fmt.Printf("Cutting off client early\n")
return
}
_, _ = conn.Write([]byte("250 Sure. Go on. \r\n"))
if err := expect(t, buf, "RCPT TO:<hi@bye.com>"); err != nil {
return
}
_, _ = conn.Write([]byte("250 Tell Me More \r\n"))
if err := expect(t, buf, "DATA"); err != nil {
return
}
_, _ = conn.Write([]byte("354 Cool Data\r\n"))
_, _ = conn.Write([]byte("250 Peace Out\r\n"))
}
}
func setup(t *testing.T) (*MailerImpl, net.Listener, func()) {
const port = "16632"
stats, _ := statsd.NewNoopClient(nil)
fromAddress, _ := mail.ParseAddress("send@email.com")
fromAddress, _ := mail.ParseAddress("you-are-a-winner@example.com")
log := blog.UseMock()
m := New(
"localhost",
port,
"user@example.com",
"paswd",
*fromAddress,
stats)
err = m.Connect()
log,
stats,
time.Second*2, time.Second*10)
l, err := net.Listen("tcp", ":"+port)
if err != nil {
t.Fatalf("listen: %s", err)
}
cleanUp := func() {
err := l.Close()
if err != nil {
t.Errorf("listen.Close: %s", err)
}
}
return m, l, cleanUp
}
func TestConnect(t *testing.T) {
m, l, cleanUp := setup(t)
defer cleanUp()
go listenForever(l, t, normalHandler)
err := m.Connect()
if err != nil {
t.Errorf("Failed to connect: %s", err)
}
@ -131,3 +202,25 @@ func TestConnect(t *testing.T) {
t.Errorf("Failed to clean up: %s", err)
}
}
func TestReconnectSuccess(t *testing.T) {
m, l, cleanUp := setup(t)
defer cleanUp()
const closedConns = 5
// Configure a test server that will disconnect the first `closedConns`
// connections after the MAIL cmd
go listenForever(l, t, disconnectHandler(closedConns))
// With a mailer client that has a max attempt > `closedConns` we expect no
// error. The message should be delivered after `closedConns` reconnect
// attempts.
err := m.Connect()
if err != nil {
t.Errorf("Failed to connect: %s", err)
}
err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding")
if err != nil {
t.Errorf("Expected SendMail() to not fail. Got err: %s", err)
}
}

View File

@ -33,26 +33,26 @@ func (f *toFilter) Match(m rcvdMail) bool {
/mail/1?to=foo@bar.com - second mail for foo@bar.com
*/
func setupHTTP(serveMux *http.ServeMux) {
serveMux.HandleFunc("/count", httpCount)
serveMux.HandleFunc("/clear", httpClear)
serveMux.Handle("/mail/", http.StripPrefix("/mail/", http.HandlerFunc(httpGetMail)))
func (s *mailSrv) setupHTTP(serveMux *http.ServeMux) {
serveMux.HandleFunc("/count", s.httpCount)
serveMux.HandleFunc("/clear", s.httpClear)
serveMux.Handle("/mail/", http.StripPrefix("/mail/", http.HandlerFunc(s.httpGetMail)))
}
func httpClear(w http.ResponseWriter, r *http.Request) {
func (s *mailSrv) httpClear(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
allMailMutex.Lock()
allReceivedMail = nil
allMailMutex.Unlock()
s.allMailMutex.Lock()
s.allReceivedMail = nil
s.allMailMutex.Unlock()
w.WriteHeader(200)
} else {
w.WriteHeader(405)
}
}
func httpCount(w http.ResponseWriter, r *http.Request) {
func (s *mailSrv) httpCount(w http.ResponseWriter, r *http.Request) {
count := 0
iterMail(extractFilter(r), func(m rcvdMail) bool {
s.iterMail(extractFilter(r), func(m rcvdMail) bool {
count++
return false
})
@ -61,7 +61,7 @@ func httpCount(w http.ResponseWriter, r *http.Request) {
var rgxGetMail = regexp.MustCompile(`^(\d+)/?$`)
func httpGetMail(w http.ResponseWriter, r *http.Request) {
func (s *mailSrv) httpGetMail(w http.ResponseWriter, r *http.Request) {
mailNum, err := strconv.Atoi(strings.Trim(r.URL.Path, "/"))
if err != nil {
w.WriteHeader(400)
@ -69,7 +69,7 @@ func httpGetMail(w http.ResponseWriter, r *http.Request) {
return
}
idx := 0
found := iterMail(extractFilter(r), func(m rcvdMail) bool {
found := s.iterMail(extractFilter(r), func(m rcvdMail) bool {
if mailNum == idx {
printMail(w, m)
return true
@ -88,10 +88,10 @@ func extractFilter(r *http.Request) toFilter {
return toFilter{To: to}
}
func iterMail(f toFilter, cb func(rcvdMail) bool) bool {
allMailMutex.Lock()
defer allMailMutex.Unlock()
for _, v := range allReceivedMail {
func (s *mailSrv) iterMail(f toFilter, cb func(rcvdMail) bool) bool {
s.allMailMutex.Lock()
defer s.allMailMutex.Unlock()
for _, v := range s.allReceivedMail {
if !f.Match(v) {
continue
}

View File

@ -21,29 +21,31 @@ func reqAndRecorder(t testing.TB, method, relativeUrl string, body io.Reader) (*
}
func TestHTTPClear(t *testing.T) {
srv := mailSrv{}
w, r := reqAndRecorder(t, "POST", "/clear", nil)
allReceivedMail = []rcvdMail{rcvdMail{}}
httpClear(w, r)
srv.allReceivedMail = []rcvdMail{rcvdMail{}}
srv.httpClear(w, r)
if w.Code != 200 {
t.Errorf("expected 200, got %d", w.Code)
}
if len(allReceivedMail) != 0 {
if len(srv.allReceivedMail) != 0 {
t.Error("/clear failed to clear mail buffer")
}
w, r = reqAndRecorder(t, "GET", "/clear", nil)
allReceivedMail = []rcvdMail{rcvdMail{}}
httpClear(w, r)
srv.allReceivedMail = []rcvdMail{rcvdMail{}}
srv.httpClear(w, r)
if w.Code != 405 {
t.Errorf("expected 405, got %d", w.Code)
}
if len(allReceivedMail) != 1 {
if len(srv.allReceivedMail) != 1 {
t.Error("GET /clear cleared the mail buffer")
}
}
func TestHTTPCount(t *testing.T) {
allReceivedMail = []rcvdMail{
srv := mailSrv{}
srv.allReceivedMail = []rcvdMail{
rcvdMail{From: "a", To: "b"},
rcvdMail{From: "a", To: "b"},
rcvdMail{From: "a", To: "c"},
@ -66,7 +68,7 @@ func TestHTTPCount(t *testing.T) {
buf.Reset()
w.Body = &buf
httpCount(w, r)
srv.httpCount(w, r)
if w.Code != 200 {
t.Errorf("%s: expected 200, got %d", test.URL, w.Code)
}

View File

@ -12,11 +12,19 @@ import (
"regexp"
"strings"
"sync"
"time"
blog "github.com/letsencrypt/boulder/log"
)
var listenAPI = flag.String("http", "0.0.0.0:9381", "http port to listen on")
type mailSrv struct {
closeFirst uint
closeDuration time.Duration
allReceivedMail []rcvdMail
allMailMutex sync.Mutex
connNumber uint
connNumberMutex sync.RWMutex
}
type rcvdMail struct {
From string
@ -24,9 +32,6 @@ type rcvdMail struct {
Mail string
}
var allReceivedMail []rcvdMail
var allMailMutex sync.Mutex
func expectLine(buf *bufio.Reader, expected string) error {
line, _, err := buf.ReadLine()
if err != nil {
@ -44,8 +49,11 @@ var dataRegex = regexp.MustCompile("^DATA\\s*$")
var smtpErr501 = []byte("501 syntax error in parameters or arguments \r\n")
var smtpOk250 = []byte("250 OK \r\n")
func handleConn(conn net.Conn) {
func (srv *mailSrv) handleConn(conn net.Conn) {
defer conn.Close()
srv.connNumberMutex.Lock()
srv.connNumber++
srv.connNumberMutex.Unlock()
auditlogger := blog.Get()
auditlogger.Info(fmt.Sprintf("mail-test-srv: Got connection from %s", conn.RemoteAddr()))
@ -91,6 +99,15 @@ func handleConn(conn net.Conn) {
case "NOOP":
conn.Write(smtpOk250)
case "MAIL":
srv.connNumberMutex.RLock()
if srv.connNumber <= srv.closeFirst {
log.Printf(
"mail-test-srv: connection # %d < -closeFirst parameter %d, disconnecting client. Bye!\n",
srv.connNumber, srv.closeFirst)
clearState()
conn.Close()
}
srv.connNumberMutex.RUnlock()
clearState()
matches := mailFromRegex.FindStringSubmatch(line)
if matches == nil {
@ -135,13 +152,13 @@ func handleConn(conn net.Conn) {
From: fromAddr,
Mail: msgBuf.String(),
}
allMailMutex.Lock()
srv.allMailMutex.Lock()
for _, rcpt := range toAddr {
mailResult.To = rcpt
allReceivedMail = append(allReceivedMail, mailResult)
srv.allReceivedMail = append(srv.allReceivedMail, mailResult)
log.Printf("mail-test-srv: Got mail: %s -> %s\n", fromAddr, rcpt)
}
allMailMutex.Unlock()
srv.allMailMutex.Unlock()
conn.Write([]byte("250 Got mail \r\n"))
clearState()
}
@ -151,24 +168,33 @@ func handleConn(conn net.Conn) {
}
}
func serveSMTP(l net.Listener) error {
func (srv *mailSrv) serveSMTP(l net.Listener) error {
for {
conn, err := l.Accept()
if err != nil {
return err
}
go handleConn(conn)
go srv.handleConn(conn)
}
}
func main() {
l, err := net.Listen("tcp", "0.0.0.0:9380")
var listenAPI = flag.String("http", "0.0.0.0:9381", "http port to listen on")
var listenSMTP = flag.String("smtp", "0.0.0.0:9380", "smtp port to listen on")
var closeFirst = flag.Uint("closeFirst", 0, "close first n connections after MAIL for reconnection tests")
flag.Parse()
l, err := net.Listen("tcp", *listenSMTP)
if err != nil {
log.Fatalln("Couldn't bind for SMTP", err)
log.Fatalln("Couldn't bind %q for SMTP", *listenSMTP, err)
}
defer l.Close()
setupHTTP(http.DefaultServeMux)
srv := mailSrv{
closeFirst: *closeFirst,
}
srv.setupHTTP(http.DefaultServeMux)
go func() {
err := http.ListenAndServe(*listenAPI, http.DefaultServeMux)
if err != nil {
@ -176,7 +202,7 @@ func main() {
}
}()
err = serveSMTP(l)
err = srv.serveSMTP(l)
if err != nil {
log.Fatalln(err, "Failed to accept connection")
}

View File

@ -57,7 +57,7 @@ def start(race_detection):
'ocsp-responder --config %s' % os.path.join(default_config_dir, "ocsp-responder.json"),
'ct-test-srv',
'dns-test-srv',
'mail-test-srv',
'mail-test-srv --closeFirst 5',
'ocsp-responder --config test/issuer-ocsp-responder.json',
'caa-checker --config cmd/caa-checker/test-config.yml'
]