email-exporter: Add an LRU cache of seen hashed email addresses (#8219)
This commit is contained in:
parent
23608e19c5
commit
0d7ea60b2c
|
@ -49,6 +49,12 @@ type Config struct {
|
|||
// PardotBaseURL is the base URL for the Pardot API. (e.g.,
|
||||
// "https://pi.pardot.com")
|
||||
PardotBaseURL string `validate:"required"`
|
||||
|
||||
// EmailCacheSize controls how many hashed email addresses are retained
|
||||
// in memory to prevent duplicates from being sent to the Pardot API.
|
||||
// Each entry consumes ~120 bytes, so 100,000 entries uses around 12 MB
|
||||
// of memory. If left unset, no caching is performed.
|
||||
EmailCacheSize int `validate:"omitempty,min=1"`
|
||||
}
|
||||
Syslog cmd.SyslogConfig
|
||||
OpenTelemetry cmd.OpenTelemetryConfig
|
||||
|
@ -87,6 +93,11 @@ func main() {
|
|||
clientSecret, err := c.EmailExporter.ClientSecret.Pass()
|
||||
cmd.FailOnError(err, "Loading clientSecret")
|
||||
|
||||
var cache *email.EmailCache
|
||||
if c.EmailExporter.EmailCacheSize > 0 {
|
||||
cache = email.NewHashedEmailCache(c.EmailExporter.EmailCacheSize, scope)
|
||||
}
|
||||
|
||||
pardotClient, err := email.NewPardotClientImpl(
|
||||
clk,
|
||||
c.EmailExporter.PardotBusinessUnit,
|
||||
|
@ -94,6 +105,7 @@ func main() {
|
|||
clientSecret,
|
||||
c.EmailExporter.SalesforceBaseURL,
|
||||
c.EmailExporter.PardotBaseURL,
|
||||
cache,
|
||||
)
|
||||
cmd.FailOnError(err, "Creating Pardot API client")
|
||||
exporterServer := email.NewExporterImpl(pardotClient, c.EmailExporter.PerDayLimit, c.EmailExporter.MaxConcurrentRequests, scope, logger)
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
package email
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/groupcache/lru"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type EmailCache struct {
|
||||
sync.Mutex
|
||||
cache *lru.Cache
|
||||
requests *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func NewHashedEmailCache(maxEntries int, stats prometheus.Registerer) *EmailCache {
|
||||
requests := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "email_cache_requests",
|
||||
}, []string{"status"})
|
||||
stats.MustRegister(requests)
|
||||
|
||||
return &EmailCache{
|
||||
cache: lru.New(maxEntries),
|
||||
requests: requests,
|
||||
}
|
||||
}
|
||||
|
||||
func hashEmail(email string) string {
|
||||
sum := sha256.Sum256([]byte(email))
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
|
||||
func (c *EmailCache) Seen(email string) bool {
|
||||
if c == nil {
|
||||
// If the cache is nil we assume it was not configured.
|
||||
return false
|
||||
}
|
||||
|
||||
hash := hashEmail(email)
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
_, ok := c.cache.Get(hash)
|
||||
if !ok {
|
||||
c.requests.WithLabelValues("miss").Inc()
|
||||
return false
|
||||
}
|
||||
|
||||
c.requests.WithLabelValues("hit").Inc()
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *EmailCache) Store(email string) {
|
||||
if c == nil {
|
||||
// If the cache is nil we assume it was not configured.
|
||||
return
|
||||
}
|
||||
|
||||
hash := hashEmail(email)
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
c.cache.Add(hash, nil)
|
||||
}
|
|
@ -17,8 +17,8 @@ import (
|
|||
|
||||
// contactsQueueCap limits the queue size to prevent unbounded growth. This
|
||||
// value is adjustable as needed. Each RFC 5321 email address, encoded in UTF-8,
|
||||
// is at most 320 bytes. Storing 10,000 emails requires ~3.44 MB of memory.
|
||||
const contactsQueueCap = 10000
|
||||
// is at most 320 bytes. Storing 100,000 emails requires ~34.4 MB of memory.
|
||||
const contactsQueueCap = 100000
|
||||
|
||||
var ErrQueueFull = errors.New("email-exporter queue is full")
|
||||
|
||||
|
|
|
@ -22,14 +22,16 @@ var ctx = context.Background()
|
|||
type mockPardotClientImpl struct {
|
||||
sync.Mutex
|
||||
CreatedContacts []string
|
||||
cache *EmailCache
|
||||
}
|
||||
|
||||
// newMockPardotClientImpl returns a MockPardotClientImpl, implementing the
|
||||
// PardotClient interface. Both refer to the same instance, with the interface
|
||||
// for mock interaction and the struct for state inspection and modification.
|
||||
func newMockPardotClientImpl() (PardotClient, *mockPardotClientImpl) {
|
||||
func newMockPardotClientImpl(cache *EmailCache) (PardotClient, *mockPardotClientImpl) {
|
||||
mockImpl := &mockPardotClientImpl{
|
||||
CreatedContacts: []string{},
|
||||
cache: cache,
|
||||
}
|
||||
return mockImpl, mockImpl
|
||||
}
|
||||
|
@ -37,9 +39,10 @@ func newMockPardotClientImpl() (PardotClient, *mockPardotClientImpl) {
|
|||
// SendContact adds an email to CreatedContacts.
|
||||
func (m *mockPardotClientImpl) SendContact(email string) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
m.CreatedContacts = append(m.CreatedContacts, email)
|
||||
m.Unlock()
|
||||
|
||||
m.cache.Store(email)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -56,7 +59,7 @@ func (m *mockPardotClientImpl) getCreatedContacts() []string {
|
|||
// ExporterImpl queue and cleanup() to drain and shutdown. If start() is called,
|
||||
// cleanup() must be called.
|
||||
func setup() (*ExporterImpl, *mockPardotClientImpl, func(), func()) {
|
||||
mockClient, clientImpl := newMockPardotClientImpl()
|
||||
mockClient, clientImpl := newMockPardotClientImpl(nil)
|
||||
exporter := NewExporterImpl(mockClient, 1000000, 5, metrics.NoopRegisterer, blog.NewMock())
|
||||
daemonCtx, cancel := context.WithCancel(context.Background())
|
||||
return exporter, clientImpl,
|
||||
|
|
|
@ -63,13 +63,14 @@ type PardotClientImpl struct {
|
|||
contactsURL string
|
||||
tokenURL string
|
||||
token *oAuthToken
|
||||
emailCache *EmailCache
|
||||
clk clock.Clock
|
||||
}
|
||||
|
||||
var _ PardotClient = &PardotClientImpl{}
|
||||
|
||||
// NewPardotClientImpl creates a new PardotClientImpl.
|
||||
func NewPardotClientImpl(clk clock.Clock, businessUnit, clientId, clientSecret, oauthbaseURL, pardotBaseURL string) (*PardotClientImpl, error) {
|
||||
func NewPardotClientImpl(clk clock.Clock, businessUnit, clientId, clientSecret, oauthbaseURL, pardotBaseURL string, cache *EmailCache) (*PardotClientImpl, error) {
|
||||
contactsURL, err := url.JoinPath(pardotBaseURL, contactsPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to join contacts path: %w", err)
|
||||
|
@ -85,9 +86,9 @@ func NewPardotClientImpl(clk clock.Clock, businessUnit, clientId, clientSecret,
|
|||
clientSecret: clientSecret,
|
||||
contactsURL: contactsURL,
|
||||
tokenURL: tokenURL,
|
||||
|
||||
token: &oAuthToken{},
|
||||
clk: clk,
|
||||
token: &oAuthToken{},
|
||||
emailCache: cache,
|
||||
clk: clk,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -144,6 +145,15 @@ func redactEmail(body []byte, email string) string {
|
|||
// SendContact submits an email to the Pardot Contacts endpoint, retrying up
|
||||
// to 3 times with exponential backoff.
|
||||
func (pc *PardotClientImpl) SendContact(email string) error {
|
||||
if pc.emailCache.Seen(email) {
|
||||
// Another goroutine has already sent this email address.
|
||||
return nil
|
||||
}
|
||||
// There is a possible race here where two goroutines could enqueue and send
|
||||
// the same email address between this check and the actual HTTP request.
|
||||
// However, at an average rate of ~1 email every 2 seconds, this is unlikely
|
||||
// to happen in practice.
|
||||
|
||||
var err error
|
||||
for attempt := range maxAttempts {
|
||||
time.Sleep(core.RetryBackoff(attempt, retryBackoffMin, retryBackoffMax, retryBackoffBase))
|
||||
|
@ -183,6 +193,7 @@ func (pc *PardotClientImpl) SendContact(email string) error {
|
|||
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
||||
pc.emailCache.Store(email)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -6,11 +6,14 @@ import (
|
|||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jmhodges/clock"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func defaultTokenHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -44,7 +47,7 @@ func TestSendContactSuccess(t *testing.T) {
|
|||
defer contactSrv.Close()
|
||||
|
||||
clk := clock.NewFake()
|
||||
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL)
|
||||
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil)
|
||||
test.AssertNotError(t, err, "failed to create client")
|
||||
|
||||
err = client.SendContact("test@example.com")
|
||||
|
@ -70,7 +73,7 @@ func TestSendContactUpdateTokenFails(t *testing.T) {
|
|||
defer contactSrv.Close()
|
||||
|
||||
clk := clock.NewFake()
|
||||
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL)
|
||||
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil)
|
||||
test.AssertNotError(t, err, "Failed to create client")
|
||||
|
||||
err = client.SendContact("test@example.com")
|
||||
|
@ -94,7 +97,7 @@ func TestSendContact4xx(t *testing.T) {
|
|||
defer contactSrv.Close()
|
||||
|
||||
clk := clock.NewFake()
|
||||
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL)
|
||||
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil)
|
||||
test.AssertNotError(t, err, "Failed to create client")
|
||||
|
||||
err = client.SendContact("test@example.com")
|
||||
|
@ -142,7 +145,7 @@ func TestSendContactTokenExpiry(t *testing.T) {
|
|||
defer contactSrv.Close()
|
||||
|
||||
clk := clock.NewFake()
|
||||
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL)
|
||||
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil)
|
||||
test.AssertNotError(t, err, "Failed to create client")
|
||||
|
||||
// First call uses the initial token ("old_token").
|
||||
|
@ -172,7 +175,7 @@ func TestSendContactServerErrorsAfterMaxAttempts(t *testing.T) {
|
|||
contactSrv := httptest.NewServer(http.HandlerFunc(contactHandler))
|
||||
defer contactSrv.Close()
|
||||
|
||||
client, _ := NewPardotClientImpl(clock.NewFake(), "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL)
|
||||
client, _ := NewPardotClientImpl(clock.NewFake(), "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil)
|
||||
|
||||
err := client.SendContact("test@example.com")
|
||||
test.AssertError(t, err, "Should fail after retrying all attempts")
|
||||
|
@ -200,7 +203,7 @@ func TestSendContactRedactsEmail(t *testing.T) {
|
|||
defer contactSrv.Close()
|
||||
|
||||
clk := clock.NewFake()
|
||||
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL)
|
||||
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil)
|
||||
test.AssertNotError(t, err, "failed to create client")
|
||||
|
||||
err = client.SendContact(emailToTest)
|
||||
|
@ -208,3 +211,30 @@ func TestSendContactRedactsEmail(t *testing.T) {
|
|||
test.AssertNotContains(t, err.Error(), emailToTest)
|
||||
test.AssertContains(t, err.Error(), "[REDACTED]")
|
||||
}
|
||||
|
||||
func TestSendContactDeduplication(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tokenSrv := httptest.NewServer(http.HandlerFunc(defaultTokenHandler))
|
||||
defer tokenSrv.Close()
|
||||
|
||||
var contactHits int32
|
||||
contactSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
atomic.AddInt32(&contactHits, 1)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer contactSrv.Close()
|
||||
|
||||
cache := NewHashedEmailCache(1000, metrics.NoopRegisterer)
|
||||
client, _ := NewPardotClientImpl(clock.New(), "biz", "cid", "csec", tokenSrv.URL, contactSrv.URL, cache)
|
||||
|
||||
err := client.SendContact("test@example.com")
|
||||
test.AssertNotError(t, err, "SendContact should succeed on first call")
|
||||
test.AssertMetricWithLabelsEquals(t, client.emailCache.requests, prometheus.Labels{"status": "miss"}, 1)
|
||||
|
||||
err = client.SendContact("test@example.com")
|
||||
test.AssertNotError(t, err, "SendContact should succeed on second call")
|
||||
|
||||
test.AssertEquals(t, int32(1), atomic.LoadInt32(&contactHits))
|
||||
test.AssertMetricWithLabelsEquals(t, client.emailCache.requests, prometheus.Labels{"status": "hit"}, 1)
|
||||
}
|
||||
|
|
|
@ -32,7 +32,8 @@
|
|||
"passwordFile": "test/secrets/salesforce_client_secret"
|
||||
},
|
||||
"salesforceBaseURL": "http://localhost:9601",
|
||||
"pardotBaseURL": "http://localhost:9602"
|
||||
"pardotBaseURL": "http://localhost:9602",
|
||||
"emailCacheSize": 100000
|
||||
},
|
||||
"syslog": {
|
||||
"stdoutlevel": 6,
|
||||
|
|
Loading…
Reference in New Issue