email/exporter: Add email.Exporter gRPC service (#8017)
Initial implementation of the email.Exporter gRPC service to be used by the new cmd/email-exporter. Part of #7966
This commit is contained in:
parent
49ebc99e8e
commit
6b85b3480b
|
@ -0,0 +1,163 @@
|
|||
package email
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
|
||||
"github.com/letsencrypt/boulder/core"
|
||||
emailpb "github.com/letsencrypt/boulder/email/proto"
|
||||
berrors "github.com/letsencrypt/boulder/errors"
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
)
|
||||
|
||||
// contactsQueueCap limits the queue size to prevent unbounded growth. This
|
||||
// value is adjustable as needed. Each RFC 5321 email address, encoded in UTF-8,
|
||||
// is at most 320 bytes. Storing 10,000 emails requires ~3.44 MB of memory.
|
||||
const contactsQueueCap = 10000
|
||||
|
||||
var ErrQueueFull = errors.New("email-exporter queue is full")
|
||||
|
||||
// ExporterImpl implements the gRPC server and processes email exports.
|
||||
type ExporterImpl struct {
|
||||
emailpb.UnsafeExporterServer
|
||||
|
||||
sync.Mutex
|
||||
drainWG sync.WaitGroup
|
||||
// wake is used to signal workers when new emails are enqueued in toSend.
|
||||
// The sync.Cond docs note that "For many simple use cases, users will be
|
||||
// better off using channels." However, channels enforce FIFO ordering,
|
||||
// while this implementation uses a LIFO queue. Making channels behave as
|
||||
// LIFO would require extra complexity. Using a slice and broadcasting is
|
||||
// simpler and achieves exactly what we need.
|
||||
wake *sync.Cond
|
||||
toSend []string
|
||||
|
||||
maxConcurrentRequests int
|
||||
limiter *rate.Limiter
|
||||
client PardotClient
|
||||
emailsHandledCounter prometheus.Counter
|
||||
log blog.Logger
|
||||
}
|
||||
|
||||
var _ emailpb.ExporterServer = (*ExporterImpl)(nil)
|
||||
|
||||
// NewExporterImpl initializes an ExporterImpl with the given client and
|
||||
// configuration. Both perDayLimit and maxConcurrentRequests should be
|
||||
// distributed proportionally among instances based on their share of the daily
|
||||
// request cap. For example, if the total daily limit is 50,000 and one instance
|
||||
// is assigned 40% (20,000 requests), it should also receive 40% of the max
|
||||
// concurrent requests (e.g., 2 out of 5). For more details, see:
|
||||
// https://developer.salesforce.com/docs/marketing/pardot/guide/overview.html?q=rate%20limits
|
||||
func NewExporterImpl(client PardotClient, perDayLimit float64, maxConcurrentRequests int, scope prometheus.Registerer, logger blog.Logger) *ExporterImpl {
|
||||
limiter := rate.NewLimiter(rate.Limit(perDayLimit/86400.0), maxConcurrentRequests)
|
||||
|
||||
emailsHandledCounter := prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "email_exporter_emails_handled",
|
||||
Help: "Total number of emails handled by the email exporter",
|
||||
})
|
||||
scope.MustRegister(emailsHandledCounter)
|
||||
|
||||
impl := &ExporterImpl{
|
||||
maxConcurrentRequests: maxConcurrentRequests,
|
||||
limiter: limiter,
|
||||
toSend: make([]string, 0, contactsQueueCap),
|
||||
client: client,
|
||||
emailsHandledCounter: emailsHandledCounter,
|
||||
log: logger,
|
||||
}
|
||||
impl.wake = sync.NewCond(&impl.Mutex)
|
||||
|
||||
queueGauge := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "email_exporter_queue_length",
|
||||
Help: "Current length of the email export queue",
|
||||
}, func() float64 {
|
||||
impl.Lock()
|
||||
defer impl.Unlock()
|
||||
return float64(len(impl.toSend))
|
||||
})
|
||||
scope.MustRegister(queueGauge)
|
||||
|
||||
return impl
|
||||
}
|
||||
|
||||
// SendContacts enqueues the provided email addresses. If the queue cannot
|
||||
// accommodate the new emails, an ErrQueueFull is returned.
|
||||
func (impl *ExporterImpl) SendContacts(ctx context.Context, req *emailpb.SendContactsRequest) (*emptypb.Empty, error) {
|
||||
if core.IsAnyNilOrZero(req, req.Emails) {
|
||||
return nil, berrors.InternalServerError("Incomplete gRPC request message")
|
||||
}
|
||||
|
||||
impl.Lock()
|
||||
defer impl.Unlock()
|
||||
|
||||
spotsLeft := contactsQueueCap - len(impl.toSend)
|
||||
if spotsLeft < len(req.Emails) {
|
||||
return nil, ErrQueueFull
|
||||
}
|
||||
impl.toSend = append(impl.toSend, req.Emails...)
|
||||
// Wake waiting workers to process the new emails.
|
||||
impl.wake.Broadcast()
|
||||
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
||||
|
||||
// Start begins asynchronous processing of the email queue. When the parent
|
||||
// daemonCtx is cancelled the queue will be drained and the workers will exit.
|
||||
func (impl *ExporterImpl) Start(daemonCtx context.Context) {
|
||||
go func() {
|
||||
<-daemonCtx.Done()
|
||||
// Wake waiting workers to exit.
|
||||
impl.wake.Broadcast()
|
||||
}()
|
||||
|
||||
worker := func() {
|
||||
defer impl.drainWG.Done()
|
||||
for {
|
||||
impl.Lock()
|
||||
|
||||
for len(impl.toSend) == 0 && daemonCtx.Err() == nil {
|
||||
// Wait for the queue to be updated or the daemon to exit.
|
||||
impl.wake.Wait()
|
||||
}
|
||||
|
||||
if len(impl.toSend) == 0 && daemonCtx.Err() != nil {
|
||||
// No more emails to process, exit.
|
||||
impl.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Dequeue and dispatch an email.
|
||||
last := len(impl.toSend) - 1
|
||||
email := impl.toSend[last]
|
||||
impl.toSend = impl.toSend[:last]
|
||||
impl.Unlock()
|
||||
|
||||
err := impl.limiter.Wait(daemonCtx)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
impl.log.Errf("Unexpected limiter.Wait() error: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
err = impl.client.SendContact(email)
|
||||
if err != nil {
|
||||
impl.log.Errf("Sending Contact to Pardot: %s", err)
|
||||
}
|
||||
impl.emailsHandledCounter.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
for range impl.maxConcurrentRequests {
|
||||
impl.drainWG.Add(1)
|
||||
go worker()
|
||||
}
|
||||
}
|
||||
|
||||
// Drain blocks until all workers have finished processing the email queue.
|
||||
func (impl *ExporterImpl) Drain() {
|
||||
impl.drainWG.Wait()
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
package email
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
emailpb "github.com/letsencrypt/boulder/email/proto"
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
)
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
// mockPardotClientImpl is a mock implementation of PardotClient.
|
||||
type mockPardotClientImpl struct {
|
||||
sync.Mutex
|
||||
CreatedContacts []string
|
||||
}
|
||||
|
||||
// newMockPardotClientImpl returns a MockPardotClientImpl, implementing the
|
||||
// PardotClient interface. Both refer to the same instance, with the interface
|
||||
// for mock interaction and the struct for state inspection and modification.
|
||||
func newMockPardotClientImpl() (PardotClient, *mockPardotClientImpl) {
|
||||
mockImpl := &mockPardotClientImpl{
|
||||
CreatedContacts: []string{},
|
||||
}
|
||||
return mockImpl, mockImpl
|
||||
}
|
||||
|
||||
// SendContact adds an email to CreatedContacts.
|
||||
func (m *mockPardotClientImpl) SendContact(email string) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
m.CreatedContacts = append(m.CreatedContacts, email)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockPardotClientImpl) getCreatedContacts() []string {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
// Return a copy to avoid race conditions.
|
||||
return slices.Clone(m.CreatedContacts)
|
||||
}
|
||||
|
||||
// setup creates a new ExporterImpl, a MockPardotClientImpl, and the start and
|
||||
// cleanup functions for the ExporterImpl. Call start() to begin processing the
|
||||
// ExporterImpl queue and cleanup() to drain and shutdown. If start() is called,
|
||||
// cleanup() must be called.
|
||||
func setup() (*ExporterImpl, *mockPardotClientImpl, func(), func()) {
|
||||
mockClient, clientImpl := newMockPardotClientImpl()
|
||||
exporter := NewExporterImpl(mockClient, 1000000, 5, metrics.NoopRegisterer, blog.NewMock())
|
||||
daemonCtx, cancel := context.WithCancel(context.Background())
|
||||
return exporter, clientImpl,
|
||||
func() { exporter.Start(daemonCtx) },
|
||||
func() {
|
||||
cancel()
|
||||
exporter.Drain()
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendContacts(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
exporter, clientImpl, start, cleanup := setup()
|
||||
start()
|
||||
defer cleanup()
|
||||
|
||||
wantContacts := []string{"test@example.com", "user@example.com"}
|
||||
_, err := exporter.SendContacts(ctx, &emailpb.SendContactsRequest{
|
||||
Emails: wantContacts,
|
||||
})
|
||||
test.AssertNotError(t, err, "Error creating contacts")
|
||||
|
||||
var gotContacts []string
|
||||
for range 100 {
|
||||
gotContacts = clientImpl.getCreatedContacts()
|
||||
if len(gotContacts) == 2 {
|
||||
break
|
||||
}
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
test.AssertSliceContains(t, gotContacts, wantContacts[0])
|
||||
test.AssertSliceContains(t, gotContacts, wantContacts[1])
|
||||
}
|
||||
|
||||
func TestSendContactsQueueFull(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
exporter, _, start, cleanup := setup()
|
||||
start()
|
||||
defer cleanup()
|
||||
|
||||
var err error
|
||||
for range contactsQueueCap * 2 {
|
||||
_, err = exporter.SendContacts(ctx, &emailpb.SendContactsRequest{
|
||||
Emails: []string{"test@example.com"},
|
||||
})
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
test.AssertErrorIs(t, err, ErrQueueFull)
|
||||
}
|
||||
|
||||
func TestSendContactsQueueDrains(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
exporter, clientImpl, start, cleanup := setup()
|
||||
start()
|
||||
|
||||
var emails []string
|
||||
for i := range 100 {
|
||||
emails = append(emails, fmt.Sprintf("test@%d.example.com", i))
|
||||
}
|
||||
|
||||
_, err := exporter.SendContacts(ctx, &emailpb.SendContactsRequest{
|
||||
Emails: emails,
|
||||
})
|
||||
test.AssertNotError(t, err, "Error creating contacts")
|
||||
|
||||
// Drain the queue.
|
||||
cleanup()
|
||||
|
||||
test.AssertEquals(t, 100, len(clientImpl.getCreatedContacts()))
|
||||
}
|
|
@ -0,0 +1,155 @@
|
|||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.34.1
|
||||
// protoc v3.20.1
|
||||
// source: exporter.proto
|
||||
|
||||
package proto
|
||||
|
||||
import (
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
emptypb "google.golang.org/protobuf/types/known/emptypb"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
// Verify that this generated code is sufficiently up-to-date.
|
||||
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
|
||||
// Verify that runtime/protoimpl is sufficiently up-to-date.
|
||||
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
|
||||
)
|
||||
|
||||
type SendContactsRequest struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
Emails []string `protobuf:"bytes,1,rep,name=emails,proto3" json:"emails,omitempty"`
|
||||
}
|
||||
|
||||
func (x *SendContactsRequest) Reset() {
|
||||
*x = SendContactsRequest{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_exporter_proto_msgTypes[0]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
}
|
||||
|
||||
func (x *SendContactsRequest) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*SendContactsRequest) ProtoMessage() {}
|
||||
|
||||
func (x *SendContactsRequest) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_exporter_proto_msgTypes[0]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use SendContactsRequest.ProtoReflect.Descriptor instead.
|
||||
func (*SendContactsRequest) Descriptor() ([]byte, []int) {
|
||||
return file_exporter_proto_rawDescGZIP(), []int{0}
|
||||
}
|
||||
|
||||
func (x *SendContactsRequest) GetEmails() []string {
|
||||
if x != nil {
|
||||
return x.Emails
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var File_exporter_proto protoreflect.FileDescriptor
|
||||
|
||||
var file_exporter_proto_rawDesc = []byte{
|
||||
0x0a, 0x0e, 0x65, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x12, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f,
|
||||
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70,
|
||||
0x72, 0x6f, 0x74, 0x6f, 0x22, 0x2d, 0x0a, 0x13, 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f, 0x6e, 0x74,
|
||||
0x61, 0x63, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x65,
|
||||
0x6d, 0x61, 0x69, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x65, 0x6d, 0x61,
|
||||
0x69, 0x6c, 0x73, 0x32, 0x4e, 0x0a, 0x08, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x72, 0x12,
|
||||
0x42, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x63, 0x74, 0x73, 0x12,
|
||||
0x1a, 0x2e, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f, 0x6e, 0x74,
|
||||
0x61, 0x63, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f,
|
||||
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d,
|
||||
0x70, 0x74, 0x79, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
|
||||
0x6d, 0x2f, 0x6c, 0x65, 0x74, 0x73, 0x65, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x2f, 0x62, 0x6f,
|
||||
0x75, 0x6c, 0x64, 0x65, 0x72, 0x2f, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74,
|
||||
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
file_exporter_proto_rawDescOnce sync.Once
|
||||
file_exporter_proto_rawDescData = file_exporter_proto_rawDesc
|
||||
)
|
||||
|
||||
func file_exporter_proto_rawDescGZIP() []byte {
|
||||
file_exporter_proto_rawDescOnce.Do(func() {
|
||||
file_exporter_proto_rawDescData = protoimpl.X.CompressGZIP(file_exporter_proto_rawDescData)
|
||||
})
|
||||
return file_exporter_proto_rawDescData
|
||||
}
|
||||
|
||||
var file_exporter_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
|
||||
var file_exporter_proto_goTypes = []interface{}{
|
||||
(*SendContactsRequest)(nil), // 0: email.SendContactsRequest
|
||||
(*emptypb.Empty)(nil), // 1: google.protobuf.Empty
|
||||
}
|
||||
var file_exporter_proto_depIdxs = []int32{
|
||||
0, // 0: email.Exporter.SendContacts:input_type -> email.SendContactsRequest
|
||||
1, // 1: email.Exporter.SendContacts:output_type -> google.protobuf.Empty
|
||||
1, // [1:2] is the sub-list for method output_type
|
||||
0, // [0:1] is the sub-list for method input_type
|
||||
0, // [0:0] is the sub-list for extension type_name
|
||||
0, // [0:0] is the sub-list for extension extendee
|
||||
0, // [0:0] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_exporter_proto_init() }
|
||||
func file_exporter_proto_init() {
|
||||
if File_exporter_proto != nil {
|
||||
return
|
||||
}
|
||||
if !protoimpl.UnsafeEnabled {
|
||||
file_exporter_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*SendContactsRequest); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
type x struct{}
|
||||
out := protoimpl.TypeBuilder{
|
||||
File: protoimpl.DescBuilder{
|
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||
RawDescriptor: file_exporter_proto_rawDesc,
|
||||
NumEnums: 0,
|
||||
NumMessages: 1,
|
||||
NumExtensions: 0,
|
||||
NumServices: 1,
|
||||
},
|
||||
GoTypes: file_exporter_proto_goTypes,
|
||||
DependencyIndexes: file_exporter_proto_depIdxs,
|
||||
MessageInfos: file_exporter_proto_msgTypes,
|
||||
}.Build()
|
||||
File_exporter_proto = out.File
|
||||
file_exporter_proto_rawDesc = nil
|
||||
file_exporter_proto_goTypes = nil
|
||||
file_exporter_proto_depIdxs = nil
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package email;
|
||||
option go_package = "github.com/letsencrypt/boulder/email/proto";
|
||||
|
||||
import "google/protobuf/empty.proto";
|
||||
|
||||
service Exporter {
|
||||
rpc SendContacts (SendContactsRequest) returns (google.protobuf.Empty);
|
||||
}
|
||||
|
||||
message SendContactsRequest {
|
||||
repeated string emails = 1;
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.3.0
|
||||
// - protoc v3.20.1
|
||||
// source: exporter.proto
|
||||
|
||||
package proto
|
||||
|
||||
import (
|
||||
context "context"
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
emptypb "google.golang.org/protobuf/types/known/emptypb"
|
||||
)
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.64.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion9
|
||||
|
||||
const (
|
||||
Exporter_SendContacts_FullMethodName = "/email.Exporter/SendContacts"
|
||||
)
|
||||
|
||||
// ExporterClient is the client API for Exporter service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
type ExporterClient interface {
|
||||
SendContacts(ctx context.Context, in *SendContactsRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
|
||||
}
|
||||
|
||||
type exporterClient struct {
|
||||
cc grpc.ClientConnInterface
|
||||
}
|
||||
|
||||
func NewExporterClient(cc grpc.ClientConnInterface) ExporterClient {
|
||||
return &exporterClient{cc}
|
||||
}
|
||||
|
||||
func (c *exporterClient) SendContacts(ctx context.Context, in *SendContactsRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(emptypb.Empty)
|
||||
err := c.cc.Invoke(ctx, Exporter_SendContacts_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ExporterServer is the server API for Exporter service.
|
||||
// All implementations must embed UnimplementedExporterServer
|
||||
// for forward compatibility
|
||||
type ExporterServer interface {
|
||||
SendContacts(context.Context, *SendContactsRequest) (*emptypb.Empty, error)
|
||||
mustEmbedUnimplementedExporterServer()
|
||||
}
|
||||
|
||||
// UnimplementedExporterServer must be embedded to have forward compatible implementations.
|
||||
type UnimplementedExporterServer struct {
|
||||
}
|
||||
|
||||
func (UnimplementedExporterServer) SendContacts(context.Context, *SendContactsRequest) (*emptypb.Empty, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method SendContacts not implemented")
|
||||
}
|
||||
func (UnimplementedExporterServer) mustEmbedUnimplementedExporterServer() {}
|
||||
|
||||
// UnsafeExporterServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to ExporterServer will
|
||||
// result in compilation errors.
|
||||
type UnsafeExporterServer interface {
|
||||
mustEmbedUnimplementedExporterServer()
|
||||
}
|
||||
|
||||
func RegisterExporterServer(s grpc.ServiceRegistrar, srv ExporterServer) {
|
||||
s.RegisterService(&Exporter_ServiceDesc, srv)
|
||||
}
|
||||
|
||||
func _Exporter_SendContacts_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(SendContactsRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ExporterServer).SendContacts(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: Exporter_SendContacts_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ExporterServer).SendContacts(ctx, req.(*SendContactsRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
// Exporter_ServiceDesc is the grpc.ServiceDesc for Exporter service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
var Exporter_ServiceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "email.Exporter",
|
||||
HandlerType: (*ExporterServer)(nil),
|
||||
Methods: []grpc.MethodDesc{
|
||||
{
|
||||
MethodName: "SendContacts",
|
||||
Handler: _Exporter_SendContacts_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{},
|
||||
Metadata: "exporter.proto",
|
||||
}
|
1
go.mod
1
go.mod
|
@ -85,6 +85,7 @@ require (
|
|||
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
|
||||
golang.org/x/mod v0.18.0 // indirect
|
||||
golang.org/x/sys v0.29.0 // indirect
|
||||
golang.org/x/time v0.10.0
|
||||
golang.org/x/tools v0.22.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -422,6 +422,8 @@ golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
|
|||
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
|
||||
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4=
|
||||
golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
Copyright 2009 The Go Authors.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are
|
||||
met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above
|
||||
copyright notice, this list of conditions and the following disclaimer
|
||||
in the documentation and/or other materials provided with the
|
||||
distribution.
|
||||
* Neither the name of Google LLC nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
@ -0,0 +1,22 @@
|
|||
Additional IP Rights Grant (Patents)
|
||||
|
||||
"This implementation" means the copyrightable works distributed by
|
||||
Google as part of the Go project.
|
||||
|
||||
Google hereby grants to You a perpetual, worldwide, non-exclusive,
|
||||
no-charge, royalty-free, irrevocable (except as stated in this section)
|
||||
patent license to make, have made, use, offer to sell, sell, import,
|
||||
transfer and otherwise run, modify and propagate the contents of this
|
||||
implementation of Go, where such license applies only to those patent
|
||||
claims, both currently owned or controlled by Google and acquired in
|
||||
the future, licensable by Google that are necessarily infringed by this
|
||||
implementation of Go. This grant does not include claims that would be
|
||||
infringed only as a consequence of further modification of this
|
||||
implementation. If you or your agent or exclusive licensee institute or
|
||||
order or agree to the institution of patent litigation against any
|
||||
entity (including a cross-claim or counterclaim in a lawsuit) alleging
|
||||
that this implementation of Go or any code incorporated within this
|
||||
implementation of Go constitutes direct or contributory patent
|
||||
infringement, or inducement of patent infringement, then any patent
|
||||
rights granted to you under this License for this implementation of Go
|
||||
shall terminate as of the date such litigation is filed.
|
|
@ -0,0 +1,426 @@
|
|||
// Copyright 2015 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package rate provides a rate limiter.
|
||||
package rate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Limit defines the maximum frequency of some events.
|
||||
// Limit is represented as number of events per second.
|
||||
// A zero Limit allows no events.
|
||||
type Limit float64
|
||||
|
||||
// Inf is the infinite rate limit; it allows all events (even if burst is zero).
|
||||
const Inf = Limit(math.MaxFloat64)
|
||||
|
||||
// Every converts a minimum time interval between events to a Limit.
|
||||
func Every(interval time.Duration) Limit {
|
||||
if interval <= 0 {
|
||||
return Inf
|
||||
}
|
||||
return 1 / Limit(interval.Seconds())
|
||||
}
|
||||
|
||||
// A Limiter controls how frequently events are allowed to happen.
|
||||
// It implements a "token bucket" of size b, initially full and refilled
|
||||
// at rate r tokens per second.
|
||||
// Informally, in any large enough time interval, the Limiter limits the
|
||||
// rate to r tokens per second, with a maximum burst size of b events.
|
||||
// As a special case, if r == Inf (the infinite rate), b is ignored.
|
||||
// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
|
||||
//
|
||||
// The zero value is a valid Limiter, but it will reject all events.
|
||||
// Use NewLimiter to create non-zero Limiters.
|
||||
//
|
||||
// Limiter has three main methods, Allow, Reserve, and Wait.
|
||||
// Most callers should use Wait.
|
||||
//
|
||||
// Each of the three methods consumes a single token.
|
||||
// They differ in their behavior when no token is available.
|
||||
// If no token is available, Allow returns false.
|
||||
// If no token is available, Reserve returns a reservation for a future token
|
||||
// and the amount of time the caller must wait before using it.
|
||||
// If no token is available, Wait blocks until one can be obtained
|
||||
// or its associated context.Context is canceled.
|
||||
//
|
||||
// The methods AllowN, ReserveN, and WaitN consume n tokens.
|
||||
//
|
||||
// Limiter is safe for simultaneous use by multiple goroutines.
|
||||
type Limiter struct {
|
||||
mu sync.Mutex
|
||||
limit Limit
|
||||
burst int
|
||||
tokens float64
|
||||
// last is the last time the limiter's tokens field was updated
|
||||
last time.Time
|
||||
// lastEvent is the latest time of a rate-limited event (past or future)
|
||||
lastEvent time.Time
|
||||
}
|
||||
|
||||
// Limit returns the maximum overall event rate.
|
||||
func (lim *Limiter) Limit() Limit {
|
||||
lim.mu.Lock()
|
||||
defer lim.mu.Unlock()
|
||||
return lim.limit
|
||||
}
|
||||
|
||||
// Burst returns the maximum burst size. Burst is the maximum number of tokens
|
||||
// that can be consumed in a single call to Allow, Reserve, or Wait, so higher
|
||||
// Burst values allow more events to happen at once.
|
||||
// A zero Burst allows no events, unless limit == Inf.
|
||||
func (lim *Limiter) Burst() int {
|
||||
lim.mu.Lock()
|
||||
defer lim.mu.Unlock()
|
||||
return lim.burst
|
||||
}
|
||||
|
||||
// TokensAt returns the number of tokens available at time t.
|
||||
func (lim *Limiter) TokensAt(t time.Time) float64 {
|
||||
lim.mu.Lock()
|
||||
_, tokens := lim.advance(t) // does not mutate lim
|
||||
lim.mu.Unlock()
|
||||
return tokens
|
||||
}
|
||||
|
||||
// Tokens returns the number of tokens available now.
|
||||
func (lim *Limiter) Tokens() float64 {
|
||||
return lim.TokensAt(time.Now())
|
||||
}
|
||||
|
||||
// NewLimiter returns a new Limiter that allows events up to rate r and permits
|
||||
// bursts of at most b tokens.
|
||||
func NewLimiter(r Limit, b int) *Limiter {
|
||||
return &Limiter{
|
||||
limit: r,
|
||||
burst: b,
|
||||
tokens: float64(b),
|
||||
}
|
||||
}
|
||||
|
||||
// Allow reports whether an event may happen now.
|
||||
func (lim *Limiter) Allow() bool {
|
||||
return lim.AllowN(time.Now(), 1)
|
||||
}
|
||||
|
||||
// AllowN reports whether n events may happen at time t.
|
||||
// Use this method if you intend to drop / skip events that exceed the rate limit.
|
||||
// Otherwise use Reserve or Wait.
|
||||
func (lim *Limiter) AllowN(t time.Time, n int) bool {
|
||||
return lim.reserveN(t, n, 0).ok
|
||||
}
|
||||
|
||||
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
|
||||
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
|
||||
type Reservation struct {
|
||||
ok bool
|
||||
lim *Limiter
|
||||
tokens int
|
||||
timeToAct time.Time
|
||||
// This is the Limit at reservation time, it can change later.
|
||||
limit Limit
|
||||
}
|
||||
|
||||
// OK returns whether the limiter can provide the requested number of tokens
|
||||
// within the maximum wait time. If OK is false, Delay returns InfDuration, and
|
||||
// Cancel does nothing.
|
||||
func (r *Reservation) OK() bool {
|
||||
return r.ok
|
||||
}
|
||||
|
||||
// Delay is shorthand for DelayFrom(time.Now()).
|
||||
func (r *Reservation) Delay() time.Duration {
|
||||
return r.DelayFrom(time.Now())
|
||||
}
|
||||
|
||||
// InfDuration is the duration returned by Delay when a Reservation is not OK.
|
||||
const InfDuration = time.Duration(math.MaxInt64)
|
||||
|
||||
// DelayFrom returns the duration for which the reservation holder must wait
|
||||
// before taking the reserved action. Zero duration means act immediately.
|
||||
// InfDuration means the limiter cannot grant the tokens requested in this
|
||||
// Reservation within the maximum wait time.
|
||||
func (r *Reservation) DelayFrom(t time.Time) time.Duration {
|
||||
if !r.ok {
|
||||
return InfDuration
|
||||
}
|
||||
delay := r.timeToAct.Sub(t)
|
||||
if delay < 0 {
|
||||
return 0
|
||||
}
|
||||
return delay
|
||||
}
|
||||
|
||||
// Cancel is shorthand for CancelAt(time.Now()).
|
||||
func (r *Reservation) Cancel() {
|
||||
r.CancelAt(time.Now())
|
||||
}
|
||||
|
||||
// CancelAt indicates that the reservation holder will not perform the reserved action
|
||||
// and reverses the effects of this Reservation on the rate limit as much as possible,
|
||||
// considering that other reservations may have already been made.
|
||||
func (r *Reservation) CancelAt(t time.Time) {
|
||||
if !r.ok {
|
||||
return
|
||||
}
|
||||
|
||||
r.lim.mu.Lock()
|
||||
defer r.lim.mu.Unlock()
|
||||
|
||||
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {
|
||||
return
|
||||
}
|
||||
|
||||
// calculate tokens to restore
|
||||
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
|
||||
// after r was obtained. These tokens should not be restored.
|
||||
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
|
||||
if restoreTokens <= 0 {
|
||||
return
|
||||
}
|
||||
// advance time to now
|
||||
t, tokens := r.lim.advance(t)
|
||||
// calculate new number of tokens
|
||||
tokens += restoreTokens
|
||||
if burst := float64(r.lim.burst); tokens > burst {
|
||||
tokens = burst
|
||||
}
|
||||
// update state
|
||||
r.lim.last = t
|
||||
r.lim.tokens = tokens
|
||||
if r.timeToAct == r.lim.lastEvent {
|
||||
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
|
||||
if !prevEvent.Before(t) {
|
||||
r.lim.lastEvent = prevEvent
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reserve is shorthand for ReserveN(time.Now(), 1).
|
||||
func (lim *Limiter) Reserve() *Reservation {
|
||||
return lim.ReserveN(time.Now(), 1)
|
||||
}
|
||||
|
||||
// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
|
||||
// The Limiter takes this Reservation into account when allowing future events.
|
||||
// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size.
|
||||
// Usage example:
|
||||
//
|
||||
// r := lim.ReserveN(time.Now(), 1)
|
||||
// if !r.OK() {
|
||||
// // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
|
||||
// return
|
||||
// }
|
||||
// time.Sleep(r.Delay())
|
||||
// Act()
|
||||
//
|
||||
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
|
||||
// If you need to respect a deadline or cancel the delay, use Wait instead.
|
||||
// To drop or skip events exceeding rate limit, use Allow instead.
|
||||
func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {
|
||||
r := lim.reserveN(t, n, InfDuration)
|
||||
return &r
|
||||
}
|
||||
|
||||
// Wait is shorthand for WaitN(ctx, 1).
|
||||
func (lim *Limiter) Wait(ctx context.Context) (err error) {
|
||||
return lim.WaitN(ctx, 1)
|
||||
}
|
||||
|
||||
// WaitN blocks until lim permits n events to happen.
|
||||
// It returns an error if n exceeds the Limiter's burst size, the Context is
|
||||
// canceled, or the expected wait time exceeds the Context's Deadline.
|
||||
// The burst limit is ignored if the rate limit is Inf.
|
||||
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
|
||||
// The test code calls lim.wait with a fake timer generator.
|
||||
// This is the real timer generator.
|
||||
newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {
|
||||
timer := time.NewTimer(d)
|
||||
return timer.C, timer.Stop, func() {}
|
||||
}
|
||||
|
||||
return lim.wait(ctx, n, time.Now(), newTimer)
|
||||
}
|
||||
|
||||
// wait is the internal implementation of WaitN.
|
||||
func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
|
||||
lim.mu.Lock()
|
||||
burst := lim.burst
|
||||
limit := lim.limit
|
||||
lim.mu.Unlock()
|
||||
|
||||
if n > burst && limit != Inf {
|
||||
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
|
||||
}
|
||||
// Check if ctx is already cancelled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
// Determine wait limit
|
||||
waitLimit := InfDuration
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
waitLimit = deadline.Sub(t)
|
||||
}
|
||||
// Reserve
|
||||
r := lim.reserveN(t, n, waitLimit)
|
||||
if !r.ok {
|
||||
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
|
||||
}
|
||||
// Wait if necessary
|
||||
delay := r.DelayFrom(t)
|
||||
if delay == 0 {
|
||||
return nil
|
||||
}
|
||||
ch, stop, advance := newTimer(delay)
|
||||
defer stop()
|
||||
advance() // only has an effect when testing
|
||||
select {
|
||||
case <-ch:
|
||||
// We can proceed.
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
// Context was canceled before we could proceed. Cancel the
|
||||
// reservation, which may permit other events to proceed sooner.
|
||||
r.Cancel()
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit).
|
||||
func (lim *Limiter) SetLimit(newLimit Limit) {
|
||||
lim.SetLimitAt(time.Now(), newLimit)
|
||||
}
|
||||
|
||||
// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
|
||||
// or underutilized by those which reserved (using Reserve or Wait) but did not yet act
|
||||
// before SetLimitAt was called.
|
||||
func (lim *Limiter) SetLimitAt(t time.Time, newLimit Limit) {
|
||||
lim.mu.Lock()
|
||||
defer lim.mu.Unlock()
|
||||
|
||||
t, tokens := lim.advance(t)
|
||||
|
||||
lim.last = t
|
||||
lim.tokens = tokens
|
||||
lim.limit = newLimit
|
||||
}
|
||||
|
||||
// SetBurst is shorthand for SetBurstAt(time.Now(), newBurst).
|
||||
func (lim *Limiter) SetBurst(newBurst int) {
|
||||
lim.SetBurstAt(time.Now(), newBurst)
|
||||
}
|
||||
|
||||
// SetBurstAt sets a new burst size for the limiter.
|
||||
func (lim *Limiter) SetBurstAt(t time.Time, newBurst int) {
|
||||
lim.mu.Lock()
|
||||
defer lim.mu.Unlock()
|
||||
|
||||
t, tokens := lim.advance(t)
|
||||
|
||||
lim.last = t
|
||||
lim.tokens = tokens
|
||||
lim.burst = newBurst
|
||||
}
|
||||
|
||||
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
|
||||
// maxFutureReserve specifies the maximum reservation wait duration allowed.
|
||||
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
|
||||
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
|
||||
lim.mu.Lock()
|
||||
defer lim.mu.Unlock()
|
||||
|
||||
if lim.limit == Inf {
|
||||
return Reservation{
|
||||
ok: true,
|
||||
lim: lim,
|
||||
tokens: n,
|
||||
timeToAct: t,
|
||||
}
|
||||
}
|
||||
|
||||
t, tokens := lim.advance(t)
|
||||
|
||||
// Calculate the remaining number of tokens resulting from the request.
|
||||
tokens -= float64(n)
|
||||
|
||||
// Calculate the wait duration
|
||||
var waitDuration time.Duration
|
||||
if tokens < 0 {
|
||||
waitDuration = lim.limit.durationFromTokens(-tokens)
|
||||
}
|
||||
|
||||
// Decide result
|
||||
ok := n <= lim.burst && waitDuration <= maxFutureReserve
|
||||
|
||||
// Prepare reservation
|
||||
r := Reservation{
|
||||
ok: ok,
|
||||
lim: lim,
|
||||
limit: lim.limit,
|
||||
}
|
||||
if ok {
|
||||
r.tokens = n
|
||||
r.timeToAct = t.Add(waitDuration)
|
||||
|
||||
// Update state
|
||||
lim.last = t
|
||||
lim.tokens = tokens
|
||||
lim.lastEvent = r.timeToAct
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
// advance calculates and returns an updated state for lim resulting from the passage of time.
|
||||
// lim is not changed.
|
||||
// advance requires that lim.mu is held.
|
||||
func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
|
||||
last := lim.last
|
||||
if t.Before(last) {
|
||||
last = t
|
||||
}
|
||||
|
||||
// Calculate the new number of tokens, due to time that passed.
|
||||
elapsed := t.Sub(last)
|
||||
delta := lim.limit.tokensFromDuration(elapsed)
|
||||
tokens := lim.tokens + delta
|
||||
if burst := float64(lim.burst); tokens > burst {
|
||||
tokens = burst
|
||||
}
|
||||
return t, tokens
|
||||
}
|
||||
|
||||
// durationFromTokens is a unit conversion function from the number of tokens to the duration
|
||||
// of time it takes to accumulate them at a rate of limit tokens per second.
|
||||
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
|
||||
if limit <= 0 {
|
||||
return InfDuration
|
||||
}
|
||||
|
||||
duration := (tokens / float64(limit)) * float64(time.Second)
|
||||
|
||||
// Cap the duration to the maximum representable int64 value, to avoid overflow.
|
||||
if duration > float64(math.MaxInt64) {
|
||||
return InfDuration
|
||||
}
|
||||
|
||||
return time.Duration(duration)
|
||||
}
|
||||
|
||||
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
|
||||
// which could be accumulated during that duration at a rate of limit tokens per second.
|
||||
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
|
||||
if limit <= 0 {
|
||||
return 0
|
||||
}
|
||||
return d.Seconds() * float64(limit)
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
// Copyright 2022 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package rate
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Sometimes will perform an action occasionally. The First, Every, and
|
||||
// Interval fields govern the behavior of Do, which performs the action.
|
||||
// A zero Sometimes value will perform an action exactly once.
|
||||
//
|
||||
// # Example: logging with rate limiting
|
||||
//
|
||||
// var sometimes = rate.Sometimes{First: 3, Interval: 10*time.Second}
|
||||
// func Spammy() {
|
||||
// sometimes.Do(func() { log.Info("here I am!") })
|
||||
// }
|
||||
type Sometimes struct {
|
||||
First int // if non-zero, the first N calls to Do will run f.
|
||||
Every int // if non-zero, every Nth call to Do will run f.
|
||||
Interval time.Duration // if non-zero and Interval has elapsed since f's last run, Do will run f.
|
||||
|
||||
mu sync.Mutex
|
||||
count int // number of Do calls
|
||||
last time.Time // last time f was run
|
||||
}
|
||||
|
||||
// Do runs the function f as allowed by First, Every, and Interval.
|
||||
//
|
||||
// The model is a union (not intersection) of filters. The first call to Do
|
||||
// always runs f. Subsequent calls to Do run f if allowed by First or Every or
|
||||
// Interval.
|
||||
//
|
||||
// A non-zero First:N causes the first N Do(f) calls to run f.
|
||||
//
|
||||
// A non-zero Every:M causes every Mth Do(f) call, starting with the first, to
|
||||
// run f.
|
||||
//
|
||||
// A non-zero Interval causes Do(f) to run f if Interval has elapsed since
|
||||
// Do last ran f.
|
||||
//
|
||||
// Specifying multiple filters produces the union of these execution streams.
|
||||
// For example, specifying both First:N and Every:M causes the first N Do(f)
|
||||
// calls and every Mth Do(f) call, starting with the first, to run f. See
|
||||
// Examples for more.
|
||||
//
|
||||
// If Do is called multiple times simultaneously, the calls will block and run
|
||||
// serially. Therefore, Do is intended for lightweight operations.
|
||||
//
|
||||
// Because a call to Do may block until f returns, if f causes Do to be called,
|
||||
// it will deadlock.
|
||||
func (s *Sometimes) Do(f func()) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.count == 0 ||
|
||||
(s.First > 0 && s.count < s.First) ||
|
||||
(s.Every > 0 && s.count%s.Every == 0) ||
|
||||
(s.Interval > 0 && time.Since(s.last) >= s.Interval) {
|
||||
f()
|
||||
s.last = time.Now()
|
||||
}
|
||||
s.count++
|
||||
}
|
|
@ -409,6 +409,9 @@ golang.org/x/text/secure/bidirule
|
|||
golang.org/x/text/transform
|
||||
golang.org/x/text/unicode/bidi
|
||||
golang.org/x/text/unicode/norm
|
||||
# golang.org/x/time v0.10.0
|
||||
## explicit; go 1.18
|
||||
golang.org/x/time/rate
|
||||
# golang.org/x/tools v0.22.0
|
||||
## explicit; go 1.19
|
||||
golang.org/x/tools/go/gcexportdata
|
||||
|
|
Loading…
Reference in New Issue