diff --git a/cmd/expiration-mailer/main.go b/cmd/expiration-mailer/main.go index 3a46d2f5f..12003bfd0 100644 --- a/cmd/expiration-mailer/main.go +++ b/cmd/expiration-mailer/main.go @@ -328,7 +328,11 @@ type config struct { func main() { configFile := flag.String("config", "", "File path to the configuration file for this service") certLimit := flag.Int("cert_limit", 0, "Count of certificates to process per expiration period") + reconnBase := flag.Duration("reconnectBase", 1*time.Second, "Base sleep duration between reconnect attempts") + reconnMax := flag.Duration("reconnectMax", 5*60*time.Second, "Max sleep duration between reconnect attempts after exponential backoff") + flag.Parse() + if *configFile == "" { flag.Usage() os.Exit(1) @@ -381,7 +385,10 @@ func main() { c.Mailer.Username, smtpPassword, *fromAddress, - stats) + logger, + stats, + *reconnBase, + *reconnMax) err = mailClient.Connect() cmd.FailOnError(err, "Couldn't connect to mail server.") defer func() { diff --git a/cmd/notify-mailer/main.go b/cmd/notify-mailer/main.go index 8cbf3160f..ac67a1343 100644 --- a/cmd/notify-mailer/main.go +++ b/cmd/notify-mailer/main.go @@ -268,6 +268,8 @@ func main() { sleep := flag.Duration("sleep", 60*time.Second, "How long to sleep between emails.") start := flag.Int("start", 0, "Line of input file to start from.") end := flag.Int("end", 99999999, "Line of input file to end before.") + reconnBase := flag.Duration("reconnectBase", 1*time.Second, "Base sleep duration between reconnect attempts") + reconnMax := flag.Duration("reconnectMax", 5*60*time.Second, "Max sleep duration between reconnect attempts after exponential backoff") type config struct { NotifyMailer struct { cmd.DBConfig @@ -333,7 +335,10 @@ func main() { cfg.NotifyMailer.Username, smtpPassword, *address, - stats) + log, + stats, + *reconnBase, + *reconnMax) } err = mailClient.Connect() cmd.FailOnError(err, fmt.Sprintf("Connecting to %s:%s", diff --git a/mail/mailer.go b/mail/mailer.go index 4fd8c3193..0169e6e5a 100644 --- a/mail/mailer.go +++ b/mail/mailer.go @@ -51,12 +51,15 @@ type Mailer interface { // MailerImpl defines a mail transfer agent to use for sending mail. It is not // safe for concurrent access. type MailerImpl struct { - dialer dialer - from mail.Address - client smtpClient - clk clock.Clock - csprgSource idGenerator - stats *metrics.StatsdScope + log blog.Logger + dialer dialer + from mail.Address + client smtpClient + clk clock.Clock + csprgSource idGenerator + stats *metrics.StatsdScope + reconnectBase time.Duration + reconnectMax time.Duration } type dialer interface { @@ -103,7 +106,16 @@ func (d dryRunClient) Write(p []byte) (n int, err error) { // New constructs a Mailer to represent an account on a particular mail // transfer agent. -func New(server, port, username, password string, from mail.Address, stats statsd.Statter) *MailerImpl { +func New( + server, + port, + username, + password string, + from mail.Address, + logger blog.Logger, + stats statsd.Statter, + reconnectBase time.Duration, + reconnectMax time.Duration) *MailerImpl { return &MailerImpl{ dialer: &dialerImpl{ username: username, @@ -111,10 +123,13 @@ func New(server, port, username, password string, from mail.Address, stats stats server: server, port: port, }, - from: from, - clk: clock.Default(), - csprgSource: realSource{}, - stats: metrics.NewStatsdScope(stats, "Mailer"), + log: logger, + from: from, + clk: clock.Default(), + csprgSource: realSource{}, + stats: metrics.NewStatsdScope(stats, "Mailer"), + reconnectBase: reconnectBase, + reconnectMax: reconnectMax, } } @@ -173,6 +188,22 @@ func (m *MailerImpl) generateMessage(to []string, subject, body string) ([]byte, )), nil } +func (m *MailerImpl) reconnect() { + for i := 0; ; i++ { + sleepDuration := core.RetryBackoff(i, m.reconnectBase, m.reconnectMax, 2) + m.log.Info(fmt.Sprintf("sleeping for %s before reconnecting mailer", sleepDuration)) + m.clk.Sleep(sleepDuration) + m.log.Info("attempting to reconnect mailer") + err := m.Connect() + if err != nil { + m.log.Warning(fmt.Sprintf("reconnect error: %s", err)) + continue + } + break + } + m.log.Info("reconnected successfully") +} + // Connect opens a connection to the specified mail server. It must be called // before SendMail. func (m *MailerImpl) Connect() error { @@ -249,10 +280,25 @@ func (m *MailerImpl) sendOne(to []string, subject, msg string) error { func (m *MailerImpl) SendMail(to []string, subject, msg string) error { m.stats.Inc("SendMail.Attempts", 1) - err := m.sendOne(to, subject, msg) - if err != nil { - m.stats.Inc("SendMail.Errors", 1) - return err + for { + err := m.sendOne(to, subject, msg) + if err == nil { + // If the error is nil, we sent the mail without issue. nice! + break + } else if err == io.EOF { + // If the error is an EOF, we should try to reconnect on a backoff + // schedule, sleeping between attempts. + m.stats.Inc("SendMail.Errors.EOF", 1) + m.reconnect() + // After reconnecting, loop around and try `sendOne` again. + m.stats.Inc("SendMail.Reconnects", 1) + continue + } else if err != nil { + // If it wasn't an EOF error it is unexpected and we return from + // SendMail() with an error + m.stats.Inc("SendMail.Errors", 1) + return err + } } m.stats.Inc("SendMail.Successes", 1) diff --git a/mail/mailer_test.go b/mail/mailer_test.go index 4d136a8c9..91406d03b 100644 --- a/mail/mailer_test.go +++ b/mail/mailer_test.go @@ -9,10 +9,12 @@ import ( "net/mail" "strings" "testing" + "time" "github.com/cactus/go-statsd-client/statsd" "github.com/jmhodges/clock" + blog "github.com/letsencrypt/boulder/log" "github.com/letsencrypt/boulder/test" ) @@ -26,7 +28,8 @@ func TestGenerateMessage(t *testing.T) { fc := clock.NewFake() stats, _ := statsd.NewNoopClient(nil) fromAddress, _ := mail.ParseAddress("happy sender ") - m := New("", "", "", "", *fromAddress, stats) + log := blog.UseMock() + m := New("", "", "", "", *fromAddress, log, stats, 0, 0) m.clk = fc m.csprgSource = fakeSource{} messageBytes, err := m.generateMessage([]string{"recv@email.com"}, "test subject", "this is the body\n") @@ -48,9 +51,10 @@ func TestGenerateMessage(t *testing.T) { } func TestFailNonASCIIAddress(t *testing.T) { + log := blog.UseMock() stats, _ := statsd.NewNoopClient(nil) fromAddress, _ := mail.ParseAddress("send@email.com") - m := New("", "", "", "", *fromAddress, stats) + m := New("", "", "", "", *fromAddress, log, stats, 0, 0) _, err := m.generateMessage([]string{"遗憾@email.com"}, "test subject", "this is the body\n") test.AssertError(t, err, "Allowed a non-ASCII to address incorrectly") } @@ -58,7 +62,7 @@ func TestFailNonASCIIAddress(t *testing.T) { func expect(t *testing.T, buf *bufio.Reader, expected string) error { line, _, err := buf.ReadLine() if err != nil { - t.Errorf("readline: %s\n", err) + t.Errorf("readline: %s expected: %s\n", err, expected) return err } if string(line) != expected { @@ -68,61 +72,128 @@ func expect(t *testing.T, buf *bufio.Reader, expected string) error { return nil } -func TestConnect(t *testing.T) { - port := "16632" - l, err := net.Listen("tcp", ":"+port) - if err != nil { - t.Errorf("listen: %s", err) - } - go func() { - defer func() { - err := l.Close() - if err != nil { - t.Errorf("listen.Close: %s", err) - } - }() - for { - conn, err := l.Accept() - if err != nil { - return - } - go func() { - defer func() { - err := conn.Close() - if err != nil { - t.Errorf("conn.Close: %s", err) - } - }() - buf := bufio.NewReader(conn) - // we can ignore write errors because any - // failures will be caught on the connecting - // side - _, _ = conn.Write([]byte("220 smtp.example.com ESMTP\n")) - if err := expect(t, buf, "EHLO localhost"); err != nil { - return - } +type connHandler func(int, *testing.T, net.Conn) - _, _ = conn.Write([]byte("250-PIPELINING\n")) - _, _ = conn.Write([]byte("250-AUTH PLAIN LOGIN\n")) - _, _ = conn.Write([]byte("250 8BITMIME\n")) - // Base64 encoding of "user@example.com\0paswd" - if err := expect(t, buf, "AUTH PLAIN AHVzZXJAZXhhbXBsZS5jb20AcGFzd2Q="); err != nil { - return - } - _, _ = conn.Write([]byte("235 2.7.0 Authentication successful\n")) - }() +func listenForever(l net.Listener, t *testing.T, handler connHandler) { + connID := 0 + for { + conn, err := l.Accept() + if err != nil { + return + } + connID++ + go handler(connID, t, conn) + } +} + +func authenticateClient(t *testing.T, conn net.Conn) { + buf := bufio.NewReader(conn) + // we can ignore write errors because any + // failures will be caught on the connecting + // side + _, _ = conn.Write([]byte("220 smtp.example.com ESMTP\n")) + if err := expect(t, buf, "EHLO localhost"); err != nil { + return + } + + _, _ = conn.Write([]byte("250-PIPELINING\n")) + _, _ = conn.Write([]byte("250-AUTH PLAIN LOGIN\n")) + _, _ = conn.Write([]byte("250 8BITMIME\n")) + // Base64 encoding of "user@example.com\0paswd" + if err := expect(t, buf, "AUTH PLAIN AHVzZXJAZXhhbXBsZS5jb20AcGFzd2Q="); err != nil { + return + } + _, _ = conn.Write([]byte("235 2.7.0 Authentication successful\n")) +} + +// The normal handler authenticates the client and then disconnects without +// further command processing. It is sufficient for TestConnect() +func normalHandler(connID int, t *testing.T, conn net.Conn) { + defer func() { + err := conn.Close() + if err != nil { + t.Errorf("conn.Close: %s", err) } }() + authenticateClient(t, conn) +} + +// The disconnectHandler authenticates the client like the normalHandler but +// additionally processes an email flow (e.g. MAIL, RCPT and DATA commands). +// When the `connID` is <= `closeFirst` the connection is closed immediately +// after the MAIL command is received and prior to issuing a 250 response. In +// this way the first `closeFirst` connections will not complete normally and +// can be tested for reconnection logic. +func disconnectHandler(closeFirst int) connHandler { + return func(connID int, t *testing.T, conn net.Conn) { + defer func() { + err := conn.Close() + if err != nil { + t.Errorf("conn.Close: %s", err) + } + }() + authenticateClient(t, conn) + + buf := bufio.NewReader(conn) + if err := expect(t, buf, "MAIL FROM:<> BODY=8BITMIME"); err != nil { + return + } + + if connID <= closeFirst { + fmt.Printf("Cutting off client early\n") + return + } + _, _ = conn.Write([]byte("250 Sure. Go on. \r\n")) + + if err := expect(t, buf, "RCPT TO:"); err != nil { + return + } + _, _ = conn.Write([]byte("250 Tell Me More \r\n")) + + if err := expect(t, buf, "DATA"); err != nil { + return + } + _, _ = conn.Write([]byte("354 Cool Data\r\n")) + _, _ = conn.Write([]byte("250 Peace Out\r\n")) + } +} + +func setup(t *testing.T) (*MailerImpl, net.Listener, func()) { + const port = "16632" stats, _ := statsd.NewNoopClient(nil) - fromAddress, _ := mail.ParseAddress("send@email.com") + fromAddress, _ := mail.ParseAddress("you-are-a-winner@example.com") + log := blog.UseMock() + m := New( "localhost", port, "user@example.com", "paswd", *fromAddress, - stats) - err = m.Connect() + log, + stats, + time.Second*2, time.Second*10) + + l, err := net.Listen("tcp", ":"+port) + if err != nil { + t.Fatalf("listen: %s", err) + } + cleanUp := func() { + err := l.Close() + if err != nil { + t.Errorf("listen.Close: %s", err) + } + } + + return m, l, cleanUp +} + +func TestConnect(t *testing.T) { + m, l, cleanUp := setup(t) + defer cleanUp() + + go listenForever(l, t, normalHandler) + err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } @@ -131,3 +202,25 @@ func TestConnect(t *testing.T) { t.Errorf("Failed to clean up: %s", err) } } + +func TestReconnectSuccess(t *testing.T) { + m, l, cleanUp := setup(t) + defer cleanUp() + const closedConns = 5 + + // Configure a test server that will disconnect the first `closedConns` + // connections after the MAIL cmd + go listenForever(l, t, disconnectHandler(closedConns)) + + // With a mailer client that has a max attempt > `closedConns` we expect no + // error. The message should be delivered after `closedConns` reconnect + // attempts. + err := m.Connect() + if err != nil { + t.Errorf("Failed to connect: %s", err) + } + err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + if err != nil { + t.Errorf("Expected SendMail() to not fail. Got err: %s", err) + } +} diff --git a/test/mail-test-srv/http.go b/test/mail-test-srv/http.go index ffcb78153..89aac8d63 100644 --- a/test/mail-test-srv/http.go +++ b/test/mail-test-srv/http.go @@ -33,26 +33,26 @@ func (f *toFilter) Match(m rcvdMail) bool { /mail/1?to=foo@bar.com - second mail for foo@bar.com */ -func setupHTTP(serveMux *http.ServeMux) { - serveMux.HandleFunc("/count", httpCount) - serveMux.HandleFunc("/clear", httpClear) - serveMux.Handle("/mail/", http.StripPrefix("/mail/", http.HandlerFunc(httpGetMail))) +func (s *mailSrv) setupHTTP(serveMux *http.ServeMux) { + serveMux.HandleFunc("/count", s.httpCount) + serveMux.HandleFunc("/clear", s.httpClear) + serveMux.Handle("/mail/", http.StripPrefix("/mail/", http.HandlerFunc(s.httpGetMail))) } -func httpClear(w http.ResponseWriter, r *http.Request) { +func (s *mailSrv) httpClear(w http.ResponseWriter, r *http.Request) { if r.Method == "POST" { - allMailMutex.Lock() - allReceivedMail = nil - allMailMutex.Unlock() + s.allMailMutex.Lock() + s.allReceivedMail = nil + s.allMailMutex.Unlock() w.WriteHeader(200) } else { w.WriteHeader(405) } } -func httpCount(w http.ResponseWriter, r *http.Request) { +func (s *mailSrv) httpCount(w http.ResponseWriter, r *http.Request) { count := 0 - iterMail(extractFilter(r), func(m rcvdMail) bool { + s.iterMail(extractFilter(r), func(m rcvdMail) bool { count++ return false }) @@ -61,7 +61,7 @@ func httpCount(w http.ResponseWriter, r *http.Request) { var rgxGetMail = regexp.MustCompile(`^(\d+)/?$`) -func httpGetMail(w http.ResponseWriter, r *http.Request) { +func (s *mailSrv) httpGetMail(w http.ResponseWriter, r *http.Request) { mailNum, err := strconv.Atoi(strings.Trim(r.URL.Path, "/")) if err != nil { w.WriteHeader(400) @@ -69,7 +69,7 @@ func httpGetMail(w http.ResponseWriter, r *http.Request) { return } idx := 0 - found := iterMail(extractFilter(r), func(m rcvdMail) bool { + found := s.iterMail(extractFilter(r), func(m rcvdMail) bool { if mailNum == idx { printMail(w, m) return true @@ -88,10 +88,10 @@ func extractFilter(r *http.Request) toFilter { return toFilter{To: to} } -func iterMail(f toFilter, cb func(rcvdMail) bool) bool { - allMailMutex.Lock() - defer allMailMutex.Unlock() - for _, v := range allReceivedMail { +func (s *mailSrv) iterMail(f toFilter, cb func(rcvdMail) bool) bool { + s.allMailMutex.Lock() + defer s.allMailMutex.Unlock() + for _, v := range s.allReceivedMail { if !f.Match(v) { continue } diff --git a/test/mail-test-srv/http_test.go b/test/mail-test-srv/http_test.go index d4d0bfb08..7d4b972a4 100644 --- a/test/mail-test-srv/http_test.go +++ b/test/mail-test-srv/http_test.go @@ -21,29 +21,31 @@ func reqAndRecorder(t testing.TB, method, relativeUrl string, body io.Reader) (* } func TestHTTPClear(t *testing.T) { + srv := mailSrv{} w, r := reqAndRecorder(t, "POST", "/clear", nil) - allReceivedMail = []rcvdMail{rcvdMail{}} - httpClear(w, r) + srv.allReceivedMail = []rcvdMail{rcvdMail{}} + srv.httpClear(w, r) if w.Code != 200 { t.Errorf("expected 200, got %d", w.Code) } - if len(allReceivedMail) != 0 { + if len(srv.allReceivedMail) != 0 { t.Error("/clear failed to clear mail buffer") } w, r = reqAndRecorder(t, "GET", "/clear", nil) - allReceivedMail = []rcvdMail{rcvdMail{}} - httpClear(w, r) + srv.allReceivedMail = []rcvdMail{rcvdMail{}} + srv.httpClear(w, r) if w.Code != 405 { t.Errorf("expected 405, got %d", w.Code) } - if len(allReceivedMail) != 1 { + if len(srv.allReceivedMail) != 1 { t.Error("GET /clear cleared the mail buffer") } } func TestHTTPCount(t *testing.T) { - allReceivedMail = []rcvdMail{ + srv := mailSrv{} + srv.allReceivedMail = []rcvdMail{ rcvdMail{From: "a", To: "b"}, rcvdMail{From: "a", To: "b"}, rcvdMail{From: "a", To: "c"}, @@ -66,7 +68,7 @@ func TestHTTPCount(t *testing.T) { buf.Reset() w.Body = &buf - httpCount(w, r) + srv.httpCount(w, r) if w.Code != 200 { t.Errorf("%s: expected 200, got %d", test.URL, w.Code) } diff --git a/test/mail-test-srv/main.go b/test/mail-test-srv/main.go index 75da93027..3044889f3 100644 --- a/test/mail-test-srv/main.go +++ b/test/mail-test-srv/main.go @@ -12,11 +12,19 @@ import ( "regexp" "strings" "sync" + "time" blog "github.com/letsencrypt/boulder/log" ) -var listenAPI = flag.String("http", "0.0.0.0:9381", "http port to listen on") +type mailSrv struct { + closeFirst uint + closeDuration time.Duration + allReceivedMail []rcvdMail + allMailMutex sync.Mutex + connNumber uint + connNumberMutex sync.RWMutex +} type rcvdMail struct { From string @@ -24,9 +32,6 @@ type rcvdMail struct { Mail string } -var allReceivedMail []rcvdMail -var allMailMutex sync.Mutex - func expectLine(buf *bufio.Reader, expected string) error { line, _, err := buf.ReadLine() if err != nil { @@ -44,8 +49,11 @@ var dataRegex = regexp.MustCompile("^DATA\\s*$") var smtpErr501 = []byte("501 syntax error in parameters or arguments \r\n") var smtpOk250 = []byte("250 OK \r\n") -func handleConn(conn net.Conn) { +func (srv *mailSrv) handleConn(conn net.Conn) { defer conn.Close() + srv.connNumberMutex.Lock() + srv.connNumber++ + srv.connNumberMutex.Unlock() auditlogger := blog.Get() auditlogger.Info(fmt.Sprintf("mail-test-srv: Got connection from %s", conn.RemoteAddr())) @@ -91,6 +99,15 @@ func handleConn(conn net.Conn) { case "NOOP": conn.Write(smtpOk250) case "MAIL": + srv.connNumberMutex.RLock() + if srv.connNumber <= srv.closeFirst { + log.Printf( + "mail-test-srv: connection # %d < -closeFirst parameter %d, disconnecting client. Bye!\n", + srv.connNumber, srv.closeFirst) + clearState() + conn.Close() + } + srv.connNumberMutex.RUnlock() clearState() matches := mailFromRegex.FindStringSubmatch(line) if matches == nil { @@ -135,13 +152,13 @@ func handleConn(conn net.Conn) { From: fromAddr, Mail: msgBuf.String(), } - allMailMutex.Lock() + srv.allMailMutex.Lock() for _, rcpt := range toAddr { mailResult.To = rcpt - allReceivedMail = append(allReceivedMail, mailResult) + srv.allReceivedMail = append(srv.allReceivedMail, mailResult) log.Printf("mail-test-srv: Got mail: %s -> %s\n", fromAddr, rcpt) } - allMailMutex.Unlock() + srv.allMailMutex.Unlock() conn.Write([]byte("250 Got mail \r\n")) clearState() } @@ -151,24 +168,33 @@ func handleConn(conn net.Conn) { } } -func serveSMTP(l net.Listener) error { +func (srv *mailSrv) serveSMTP(l net.Listener) error { for { conn, err := l.Accept() if err != nil { return err } - go handleConn(conn) + go srv.handleConn(conn) } } func main() { - l, err := net.Listen("tcp", "0.0.0.0:9380") + var listenAPI = flag.String("http", "0.0.0.0:9381", "http port to listen on") + var listenSMTP = flag.String("smtp", "0.0.0.0:9380", "smtp port to listen on") + var closeFirst = flag.Uint("closeFirst", 0, "close first n connections after MAIL for reconnection tests") + + flag.Parse() + l, err := net.Listen("tcp", *listenSMTP) if err != nil { - log.Fatalln("Couldn't bind for SMTP", err) + log.Fatalln("Couldn't bind %q for SMTP", *listenSMTP, err) } defer l.Close() - setupHTTP(http.DefaultServeMux) + srv := mailSrv{ + closeFirst: *closeFirst, + } + + srv.setupHTTP(http.DefaultServeMux) go func() { err := http.ListenAndServe(*listenAPI, http.DefaultServeMux) if err != nil { @@ -176,7 +202,7 @@ func main() { } }() - err = serveSMTP(l) + err = srv.serveSMTP(l) if err != nil { log.Fatalln(err, "Failed to accept connection") } diff --git a/test/startservers.py b/test/startservers.py index 38f6dbeb8..014bc3b80 100644 --- a/test/startservers.py +++ b/test/startservers.py @@ -57,7 +57,7 @@ def start(race_detection): 'ocsp-responder --config %s' % os.path.join(default_config_dir, "ocsp-responder.json"), 'ct-test-srv', 'dns-test-srv', - 'mail-test-srv', + 'mail-test-srv --closeFirst 5', 'ocsp-responder --config test/issuer-ocsp-responder.json', 'caa-checker --config cmd/caa-checker/test-config.yml' ]