boulder/email/exporter.go

164 lines
5.1 KiB
Go

package email
import (
"context"
"errors"
"sync"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/letsencrypt/boulder/core"
emailpb "github.com/letsencrypt/boulder/email/proto"
berrors "github.com/letsencrypt/boulder/errors"
blog "github.com/letsencrypt/boulder/log"
)
// contactsQueueCap limits the queue size to prevent unbounded growth. This
// value is adjustable as needed. Each RFC 5321 email address, encoded in UTF-8,
// is at most 320 bytes. Storing 10,000 emails requires ~3.44 MB of memory.
const contactsQueueCap = 10000
var ErrQueueFull = errors.New("email-exporter queue is full")
// ExporterImpl implements the gRPC server and processes email exports.
type ExporterImpl struct {
emailpb.UnsafeExporterServer
sync.Mutex
drainWG sync.WaitGroup
// wake is used to signal workers when new emails are enqueued in toSend.
// The sync.Cond docs note that "For many simple use cases, users will be
// better off using channels." However, channels enforce FIFO ordering,
// while this implementation uses a LIFO queue. Making channels behave as
// LIFO would require extra complexity. Using a slice and broadcasting is
// simpler and achieves exactly what we need.
wake *sync.Cond
toSend []string
maxConcurrentRequests int
limiter *rate.Limiter
client PardotClient
emailsHandledCounter prometheus.Counter
log blog.Logger
}
var _ emailpb.ExporterServer = (*ExporterImpl)(nil)
// NewExporterImpl initializes an ExporterImpl with the given client and
// configuration. Both perDayLimit and maxConcurrentRequests should be
// distributed proportionally among instances based on their share of the daily
// request cap. For example, if the total daily limit is 50,000 and one instance
// is assigned 40% (20,000 requests), it should also receive 40% of the max
// concurrent requests (e.g., 2 out of 5). For more details, see:
// https://developer.salesforce.com/docs/marketing/pardot/guide/overview.html?q=rate%20limits
func NewExporterImpl(client PardotClient, perDayLimit float64, maxConcurrentRequests int, scope prometheus.Registerer, logger blog.Logger) *ExporterImpl {
limiter := rate.NewLimiter(rate.Limit(perDayLimit/86400.0), maxConcurrentRequests)
emailsHandledCounter := prometheus.NewCounter(prometheus.CounterOpts{
Name: "email_exporter_emails_handled",
Help: "Total number of emails handled by the email exporter",
})
scope.MustRegister(emailsHandledCounter)
impl := &ExporterImpl{
maxConcurrentRequests: maxConcurrentRequests,
limiter: limiter,
toSend: make([]string, 0, contactsQueueCap),
client: client,
emailsHandledCounter: emailsHandledCounter,
log: logger,
}
impl.wake = sync.NewCond(&impl.Mutex)
queueGauge := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "email_exporter_queue_length",
Help: "Current length of the email export queue",
}, func() float64 {
impl.Lock()
defer impl.Unlock()
return float64(len(impl.toSend))
})
scope.MustRegister(queueGauge)
return impl
}
// SendContacts enqueues the provided email addresses. If the queue cannot
// accommodate the new emails, an ErrQueueFull is returned.
func (impl *ExporterImpl) SendContacts(ctx context.Context, req *emailpb.SendContactsRequest) (*emptypb.Empty, error) {
if core.IsAnyNilOrZero(req, req.Emails) {
return nil, berrors.InternalServerError("Incomplete gRPC request message")
}
impl.Lock()
defer impl.Unlock()
spotsLeft := contactsQueueCap - len(impl.toSend)
if spotsLeft < len(req.Emails) {
return nil, ErrQueueFull
}
impl.toSend = append(impl.toSend, req.Emails...)
// Wake waiting workers to process the new emails.
impl.wake.Broadcast()
return &emptypb.Empty{}, nil
}
// Start begins asynchronous processing of the email queue. When the parent
// daemonCtx is cancelled the queue will be drained and the workers will exit.
func (impl *ExporterImpl) Start(daemonCtx context.Context) {
go func() {
<-daemonCtx.Done()
// Wake waiting workers to exit.
impl.wake.Broadcast()
}()
worker := func() {
defer impl.drainWG.Done()
for {
impl.Lock()
for len(impl.toSend) == 0 && daemonCtx.Err() == nil {
// Wait for the queue to be updated or the daemon to exit.
impl.wake.Wait()
}
if len(impl.toSend) == 0 && daemonCtx.Err() != nil {
// No more emails to process, exit.
impl.Unlock()
return
}
// Dequeue and dispatch an email.
last := len(impl.toSend) - 1
email := impl.toSend[last]
impl.toSend = impl.toSend[:last]
impl.Unlock()
err := impl.limiter.Wait(daemonCtx)
if err != nil && !errors.Is(err, context.Canceled) {
impl.log.Errf("Unexpected limiter.Wait() error: %s", err)
continue
}
err = impl.client.SendContact(email)
if err != nil {
impl.log.Errf("Sending Contact to Pardot: %s", err)
}
impl.emailsHandledCounter.Inc()
}
}
for range impl.maxConcurrentRequests {
impl.drainWG.Add(1)
go worker()
}
}
// Drain blocks until all workers have finished processing the email queue.
func (impl *ExporterImpl) Drain() {
impl.drainWG.Wait()
}