From 0d7ea60b2cb6fa3553920d4ad3d630dbea28e66e Mon Sep 17 00:00:00 2001 From: Samantha Frank Date: Fri, 30 May 2025 17:04:35 -0400 Subject: [PATCH] email-exporter: Add an LRU cache of seen hashed email addresses (#8219) --- cmd/email-exporter/main.go | 12 +++++ email/cache.go | 68 ++++++++++++++++++++++++++++ email/exporter.go | 4 +- email/exporter_test.go | 11 +++-- email/pardot.go | 19 ++++++-- email/pardot_test.go | 42 ++++++++++++++--- test/config-next/email-exporter.json | 3 +- 7 files changed, 142 insertions(+), 17 deletions(-) create mode 100644 email/cache.go diff --git a/cmd/email-exporter/main.go b/cmd/email-exporter/main.go index 56707ef74..3600478df 100644 --- a/cmd/email-exporter/main.go +++ b/cmd/email-exporter/main.go @@ -49,6 +49,12 @@ type Config struct { // PardotBaseURL is the base URL for the Pardot API. (e.g., // "https://pi.pardot.com") PardotBaseURL string `validate:"required"` + + // EmailCacheSize controls how many hashed email addresses are retained + // in memory to prevent duplicates from being sent to the Pardot API. + // Each entry consumes ~120 bytes, so 100,000 entries uses around 12 MB + // of memory. If left unset, no caching is performed. + EmailCacheSize int `validate:"omitempty,min=1"` } Syslog cmd.SyslogConfig OpenTelemetry cmd.OpenTelemetryConfig @@ -87,6 +93,11 @@ func main() { clientSecret, err := c.EmailExporter.ClientSecret.Pass() cmd.FailOnError(err, "Loading clientSecret") + var cache *email.EmailCache + if c.EmailExporter.EmailCacheSize > 0 { + cache = email.NewHashedEmailCache(c.EmailExporter.EmailCacheSize, scope) + } + pardotClient, err := email.NewPardotClientImpl( clk, c.EmailExporter.PardotBusinessUnit, @@ -94,6 +105,7 @@ func main() { clientSecret, c.EmailExporter.SalesforceBaseURL, c.EmailExporter.PardotBaseURL, + cache, ) cmd.FailOnError(err, "Creating Pardot API client") exporterServer := email.NewExporterImpl(pardotClient, c.EmailExporter.PerDayLimit, c.EmailExporter.MaxConcurrentRequests, scope, logger) diff --git a/email/cache.go b/email/cache.go new file mode 100644 index 000000000..ff129cb86 --- /dev/null +++ b/email/cache.go @@ -0,0 +1,68 @@ +package email + +import ( + "crypto/sha256" + "encoding/hex" + "sync" + + "github.com/golang/groupcache/lru" + "github.com/prometheus/client_golang/prometheus" +) + +type EmailCache struct { + sync.Mutex + cache *lru.Cache + requests *prometheus.CounterVec +} + +func NewHashedEmailCache(maxEntries int, stats prometheus.Registerer) *EmailCache { + requests := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "email_cache_requests", + }, []string{"status"}) + stats.MustRegister(requests) + + return &EmailCache{ + cache: lru.New(maxEntries), + requests: requests, + } +} + +func hashEmail(email string) string { + sum := sha256.Sum256([]byte(email)) + return hex.EncodeToString(sum[:]) +} + +func (c *EmailCache) Seen(email string) bool { + if c == nil { + // If the cache is nil we assume it was not configured. + return false + } + + hash := hashEmail(email) + + c.Lock() + defer c.Unlock() + + _, ok := c.cache.Get(hash) + if !ok { + c.requests.WithLabelValues("miss").Inc() + return false + } + + c.requests.WithLabelValues("hit").Inc() + return true +} + +func (c *EmailCache) Store(email string) { + if c == nil { + // If the cache is nil we assume it was not configured. + return + } + + hash := hashEmail(email) + + c.Lock() + defer c.Unlock() + + c.cache.Add(hash, nil) +} diff --git a/email/exporter.go b/email/exporter.go index 58c6d4022..9c34c5735 100644 --- a/email/exporter.go +++ b/email/exporter.go @@ -17,8 +17,8 @@ import ( // contactsQueueCap limits the queue size to prevent unbounded growth. This // value is adjustable as needed. Each RFC 5321 email address, encoded in UTF-8, -// is at most 320 bytes. Storing 10,000 emails requires ~3.44 MB of memory. -const contactsQueueCap = 10000 +// is at most 320 bytes. Storing 100,000 emails requires ~34.4 MB of memory. +const contactsQueueCap = 100000 var ErrQueueFull = errors.New("email-exporter queue is full") diff --git a/email/exporter_test.go b/email/exporter_test.go index 993195899..718a2071c 100644 --- a/email/exporter_test.go +++ b/email/exporter_test.go @@ -22,14 +22,16 @@ var ctx = context.Background() type mockPardotClientImpl struct { sync.Mutex CreatedContacts []string + cache *EmailCache } // newMockPardotClientImpl returns a MockPardotClientImpl, implementing the // PardotClient interface. Both refer to the same instance, with the interface // for mock interaction and the struct for state inspection and modification. -func newMockPardotClientImpl() (PardotClient, *mockPardotClientImpl) { +func newMockPardotClientImpl(cache *EmailCache) (PardotClient, *mockPardotClientImpl) { mockImpl := &mockPardotClientImpl{ CreatedContacts: []string{}, + cache: cache, } return mockImpl, mockImpl } @@ -37,9 +39,10 @@ func newMockPardotClientImpl() (PardotClient, *mockPardotClientImpl) { // SendContact adds an email to CreatedContacts. func (m *mockPardotClientImpl) SendContact(email string) error { m.Lock() - defer m.Unlock() - m.CreatedContacts = append(m.CreatedContacts, email) + m.Unlock() + + m.cache.Store(email) return nil } @@ -56,7 +59,7 @@ func (m *mockPardotClientImpl) getCreatedContacts() []string { // ExporterImpl queue and cleanup() to drain and shutdown. If start() is called, // cleanup() must be called. func setup() (*ExporterImpl, *mockPardotClientImpl, func(), func()) { - mockClient, clientImpl := newMockPardotClientImpl() + mockClient, clientImpl := newMockPardotClientImpl(nil) exporter := NewExporterImpl(mockClient, 1000000, 5, metrics.NoopRegisterer, blog.NewMock()) daemonCtx, cancel := context.WithCancel(context.Background()) return exporter, clientImpl, diff --git a/email/pardot.go b/email/pardot.go index c7d1db589..319f1ba74 100644 --- a/email/pardot.go +++ b/email/pardot.go @@ -63,13 +63,14 @@ type PardotClientImpl struct { contactsURL string tokenURL string token *oAuthToken + emailCache *EmailCache clk clock.Clock } var _ PardotClient = &PardotClientImpl{} // NewPardotClientImpl creates a new PardotClientImpl. -func NewPardotClientImpl(clk clock.Clock, businessUnit, clientId, clientSecret, oauthbaseURL, pardotBaseURL string) (*PardotClientImpl, error) { +func NewPardotClientImpl(clk clock.Clock, businessUnit, clientId, clientSecret, oauthbaseURL, pardotBaseURL string, cache *EmailCache) (*PardotClientImpl, error) { contactsURL, err := url.JoinPath(pardotBaseURL, contactsPath) if err != nil { return nil, fmt.Errorf("failed to join contacts path: %w", err) @@ -85,9 +86,9 @@ func NewPardotClientImpl(clk clock.Clock, businessUnit, clientId, clientSecret, clientSecret: clientSecret, contactsURL: contactsURL, tokenURL: tokenURL, - - token: &oAuthToken{}, - clk: clk, + token: &oAuthToken{}, + emailCache: cache, + clk: clk, }, nil } @@ -144,6 +145,15 @@ func redactEmail(body []byte, email string) string { // SendContact submits an email to the Pardot Contacts endpoint, retrying up // to 3 times with exponential backoff. func (pc *PardotClientImpl) SendContact(email string) error { + if pc.emailCache.Seen(email) { + // Another goroutine has already sent this email address. + return nil + } + // There is a possible race here where two goroutines could enqueue and send + // the same email address between this check and the actual HTTP request. + // However, at an average rate of ~1 email every 2 seconds, this is unlikely + // to happen in practice. + var err error for attempt := range maxAttempts { time.Sleep(core.RetryBackoff(attempt, retryBackoffMin, retryBackoffMax, retryBackoffBase)) @@ -183,6 +193,7 @@ func (pc *PardotClientImpl) SendContact(email string) error { defer resp.Body.Close() if resp.StatusCode >= 200 && resp.StatusCode < 300 { + pc.emailCache.Store(email) return nil } diff --git a/email/pardot_test.go b/email/pardot_test.go index 700ed6982..bdb8746cf 100644 --- a/email/pardot_test.go +++ b/email/pardot_test.go @@ -6,11 +6,14 @@ import ( "io" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" "github.com/jmhodges/clock" + "github.com/letsencrypt/boulder/metrics" "github.com/letsencrypt/boulder/test" + "github.com/prometheus/client_golang/prometheus" ) func defaultTokenHandler(w http.ResponseWriter, r *http.Request) { @@ -44,7 +47,7 @@ func TestSendContactSuccess(t *testing.T) { defer contactSrv.Close() clk := clock.NewFake() - client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL) + client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil) test.AssertNotError(t, err, "failed to create client") err = client.SendContact("test@example.com") @@ -70,7 +73,7 @@ func TestSendContactUpdateTokenFails(t *testing.T) { defer contactSrv.Close() clk := clock.NewFake() - client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL) + client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil) test.AssertNotError(t, err, "Failed to create client") err = client.SendContact("test@example.com") @@ -94,7 +97,7 @@ func TestSendContact4xx(t *testing.T) { defer contactSrv.Close() clk := clock.NewFake() - client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL) + client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil) test.AssertNotError(t, err, "Failed to create client") err = client.SendContact("test@example.com") @@ -142,7 +145,7 @@ func TestSendContactTokenExpiry(t *testing.T) { defer contactSrv.Close() clk := clock.NewFake() - client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL) + client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil) test.AssertNotError(t, err, "Failed to create client") // First call uses the initial token ("old_token"). @@ -172,7 +175,7 @@ func TestSendContactServerErrorsAfterMaxAttempts(t *testing.T) { contactSrv := httptest.NewServer(http.HandlerFunc(contactHandler)) defer contactSrv.Close() - client, _ := NewPardotClientImpl(clock.NewFake(), "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL) + client, _ := NewPardotClientImpl(clock.NewFake(), "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil) err := client.SendContact("test@example.com") test.AssertError(t, err, "Should fail after retrying all attempts") @@ -200,7 +203,7 @@ func TestSendContactRedactsEmail(t *testing.T) { defer contactSrv.Close() clk := clock.NewFake() - client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL) + client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil) test.AssertNotError(t, err, "failed to create client") err = client.SendContact(emailToTest) @@ -208,3 +211,30 @@ func TestSendContactRedactsEmail(t *testing.T) { test.AssertNotContains(t, err.Error(), emailToTest) test.AssertContains(t, err.Error(), "[REDACTED]") } + +func TestSendContactDeduplication(t *testing.T) { + t.Parallel() + + tokenSrv := httptest.NewServer(http.HandlerFunc(defaultTokenHandler)) + defer tokenSrv.Close() + + var contactHits int32 + contactSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + atomic.AddInt32(&contactHits, 1) + w.WriteHeader(http.StatusOK) + })) + defer contactSrv.Close() + + cache := NewHashedEmailCache(1000, metrics.NoopRegisterer) + client, _ := NewPardotClientImpl(clock.New(), "biz", "cid", "csec", tokenSrv.URL, contactSrv.URL, cache) + + err := client.SendContact("test@example.com") + test.AssertNotError(t, err, "SendContact should succeed on first call") + test.AssertMetricWithLabelsEquals(t, client.emailCache.requests, prometheus.Labels{"status": "miss"}, 1) + + err = client.SendContact("test@example.com") + test.AssertNotError(t, err, "SendContact should succeed on second call") + + test.AssertEquals(t, int32(1), atomic.LoadInt32(&contactHits)) + test.AssertMetricWithLabelsEquals(t, client.emailCache.requests, prometheus.Labels{"status": "hit"}, 1) +} diff --git a/test/config-next/email-exporter.json b/test/config-next/email-exporter.json index 8505cc453..5652e0c1c 100644 --- a/test/config-next/email-exporter.json +++ b/test/config-next/email-exporter.json @@ -32,7 +32,8 @@ "passwordFile": "test/secrets/salesforce_client_secret" }, "salesforceBaseURL": "http://localhost:9601", - "pardotBaseURL": "http://localhost:9602" + "pardotBaseURL": "http://localhost:9602", + "emailCacheSize": 100000 }, "syslog": { "stdoutlevel": 6,