Change multiBatcher to use sync.Map, avoid global lock on fast path (#7714)
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
parent
e79db959c3
commit
e5a1330342
|
|
@ -0,0 +1,11 @@
|
||||||
|
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
|
||||||
|
change_type: enhancement
|
||||||
|
|
||||||
|
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
|
||||||
|
component: batchprocessor
|
||||||
|
|
||||||
|
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
|
||||||
|
note: Change multiBatcher to use sync.Map, avoid global lock on fast path
|
||||||
|
|
||||||
|
# One or more tracking issues or pull requests related to the change
|
||||||
|
issues: [7714]
|
||||||
|
|
@ -61,42 +61,28 @@ type batchProcessor struct {
|
||||||
|
|
||||||
telemetry *batchProcessorTelemetry
|
telemetry *batchProcessorTelemetry
|
||||||
|
|
||||||
// batcherFinder will be either *singletonBatcher or *multiBatcher
|
// batcher will be either *singletonBatcher or *multiBatcher
|
||||||
batcherFinder
|
batcher batcher
|
||||||
}
|
}
|
||||||
|
|
||||||
type batcherFinder interface {
|
type batcher interface {
|
||||||
findBatcher(ctx context.Context) (*batcher, error)
|
consume(ctx context.Context, data any) error
|
||||||
currentMetadataCardinality() int
|
currentMetadataCardinality() int
|
||||||
}
|
}
|
||||||
|
|
||||||
// singleBatcher is used when metadataKeys is empty, to avoid the
|
// shard is a single instance of the batch logic. When metadata
|
||||||
// additional lock and map operations used in multiBatcher.
|
|
||||||
type singleBatcher struct {
|
|
||||||
*batcher
|
|
||||||
}
|
|
||||||
|
|
||||||
// multiBatcher is used when metadataKeys is not empty.
|
|
||||||
type multiBatcher struct {
|
|
||||||
*batchProcessor
|
|
||||||
|
|
||||||
lock sync.Mutex
|
|
||||||
batchers map[attribute.Set]*batcher
|
|
||||||
}
|
|
||||||
|
|
||||||
// batcher is a single instance of the batcher logic. When metadata
|
|
||||||
// keys are in use, one of these is created per distinct combination
|
// keys are in use, one of these is created per distinct combination
|
||||||
// of values.
|
// of values.
|
||||||
type batcher struct {
|
type shard struct {
|
||||||
// processor refers to this processor, for access to common
|
// processor refers to this processor, for access to common
|
||||||
// configuration.
|
// configuration.
|
||||||
processor *batchProcessor
|
processor *batchProcessor
|
||||||
|
|
||||||
// exportCtx is a context with the metadata key-values
|
// exportCtx is a context with the metadata key-values
|
||||||
// corresponding with this batcher set.
|
// corresponding with this shard set.
|
||||||
exportCtx context.Context
|
exportCtx context.Context
|
||||||
|
|
||||||
// timer informs the batcher send a batch.
|
// timer informs the shard send a batch.
|
||||||
timer *time.Timer
|
timer *time.Timer
|
||||||
|
|
||||||
// newItem is used to receive data items from producers.
|
// newItem is used to receive data items from producers.
|
||||||
|
|
@ -143,15 +129,14 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
|
||||||
metadataLimit: int(cfg.MetadataCardinalityLimit),
|
metadataLimit: int(cfg.MetadataCardinalityLimit),
|
||||||
}
|
}
|
||||||
if len(bp.metadataKeys) == 0 {
|
if len(bp.metadataKeys) == 0 {
|
||||||
bp.batcherFinder = &singleBatcher{bp.newBatcher(nil)}
|
bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)}
|
||||||
} else {
|
} else {
|
||||||
bp.batcherFinder = &multiBatcher{
|
bp.batcher = &multiShardBatcher{
|
||||||
batchProcessor: bp,
|
batchProcessor: bp,
|
||||||
batchers: map[attribute.Set]*batcher{},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bpt, err := newBatchProcessorTelemetry(set, bp.currentMetadataCardinality, useOtel)
|
bpt, err := newBatchProcessorTelemetry(set, bp.batcher.currentMetadataCardinality, useOtel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating batch processor telemetry: %w", err)
|
return nil, fmt.Errorf("error creating batch processor telemetry: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -160,12 +145,12 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
|
||||||
return bp, nil
|
return bp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// newBatcher gets or creates a batcher corresponding with attrs.
|
// newShard gets or creates a batcher corresponding with attrs.
|
||||||
func (bp *batchProcessor) newBatcher(md map[string][]string) *batcher {
|
func (bp *batchProcessor) newShard(md map[string][]string) *shard {
|
||||||
exportCtx := client.NewContext(context.Background(), client.Info{
|
exportCtx := client.NewContext(context.Background(), client.Info{
|
||||||
Metadata: client.NewMetadata(md),
|
Metadata: client.NewMetadata(md),
|
||||||
})
|
})
|
||||||
b := &batcher{
|
b := &shard{
|
||||||
processor: bp,
|
processor: bp,
|
||||||
newItem: make(chan any, runtime.NumCPU()),
|
newItem: make(chan any, runtime.NumCPU()),
|
||||||
exportCtx: exportCtx,
|
exportCtx: exportCtx,
|
||||||
|
|
@ -194,7 +179,7 @@ func (bp *batchProcessor) Shutdown(context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batcher) start() {
|
func (b *shard) start() {
|
||||||
defer b.processor.goroutines.Done()
|
defer b.processor.goroutines.Done()
|
||||||
|
|
||||||
// timerCh ensures we only block when there is a
|
// timerCh ensures we only block when there is a
|
||||||
|
|
@ -237,7 +222,7 @@ func (b *batcher) start() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batcher) processItem(item any) {
|
func (b *shard) processItem(item any) {
|
||||||
b.batch.add(item)
|
b.batch.add(item)
|
||||||
sent := false
|
sent := false
|
||||||
for b.batch.itemCount() > 0 && (!b.hasTimer() || b.batch.itemCount() >= b.processor.sendBatchSize) {
|
for b.batch.itemCount() > 0 && (!b.hasTimer() || b.batch.itemCount() >= b.processor.sendBatchSize) {
|
||||||
|
|
@ -251,23 +236,23 @@ func (b *batcher) processItem(item any) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batcher) hasTimer() bool {
|
func (b *shard) hasTimer() bool {
|
||||||
return b.timer != nil
|
return b.timer != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batcher) stopTimer() {
|
func (b *shard) stopTimer() {
|
||||||
if b.hasTimer() && !b.timer.Stop() {
|
if b.hasTimer() && !b.timer.Stop() {
|
||||||
<-b.timer.C
|
<-b.timer.C
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batcher) resetTimer() {
|
func (b *shard) resetTimer() {
|
||||||
if b.hasTimer() {
|
if b.hasTimer() {
|
||||||
b.timer.Reset(b.processor.timeout)
|
b.timer.Reset(b.processor.timeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batcher) sendItems(trigger trigger) {
|
func (b *shard) sendItems(trigger trigger) {
|
||||||
sent, bytes, err := b.batch.export(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)
|
sent, bytes, err := b.batch.export(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.processor.logger.Warn("Sender failed", zap.Error(err))
|
b.processor.logger.Warn("Sender failed", zap.Error(err))
|
||||||
|
|
@ -276,11 +261,33 @@ func (b *batcher) sendItems(trigger trigger) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *singleBatcher) findBatcher(_ context.Context) (*batcher, error) {
|
// singleShardBatcher is used when metadataKeys is empty, to avoid the
|
||||||
return sb.batcher, nil
|
// additional lock and map operations used in multiBatcher.
|
||||||
|
type singleShardBatcher struct {
|
||||||
|
batcher *shard
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mb *multiBatcher) findBatcher(ctx context.Context) (*batcher, error) {
|
func (sb *singleShardBatcher) consume(_ context.Context, data any) error {
|
||||||
|
sb.batcher.newItem <- data
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sb *singleShardBatcher) currentMetadataCardinality() int {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// multiBatcher is used when metadataKeys is not empty.
|
||||||
|
type multiShardBatcher struct {
|
||||||
|
*batchProcessor
|
||||||
|
batchers sync.Map
|
||||||
|
|
||||||
|
// Guards the size and the storing logic to ensure no more than limit items are stored.
|
||||||
|
// If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic.
|
||||||
|
lock sync.Mutex
|
||||||
|
size int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
|
||||||
// Get each metadata key value, form the corresponding
|
// Get each metadata key value, form the corresponding
|
||||||
// attribute set for use as a map lookup key.
|
// attribute set for use as a map lookup key.
|
||||||
info := client.FromContext(ctx)
|
info := client.FromContext(ctx)
|
||||||
|
|
@ -300,63 +307,46 @@ func (mb *multiBatcher) findBatcher(ctx context.Context) (*batcher, error) {
|
||||||
}
|
}
|
||||||
aset := attribute.NewSet(attrs...)
|
aset := attribute.NewSet(attrs...)
|
||||||
|
|
||||||
mb.lock.Lock()
|
b, ok := mb.batchers.Load(aset)
|
||||||
defer mb.lock.Unlock()
|
if !ok {
|
||||||
|
mb.lock.Lock()
|
||||||
|
if mb.metadataLimit != 0 && mb.size >= mb.metadataLimit {
|
||||||
|
mb.lock.Unlock()
|
||||||
|
return errTooManyBatchers
|
||||||
|
}
|
||||||
|
|
||||||
b, ok := mb.batchers[aset]
|
// aset.ToSlice() returns the sorted, deduplicated,
|
||||||
if ok {
|
// and name-downcased list of attributes.
|
||||||
return b, nil
|
var loaded bool
|
||||||
|
b, loaded = mb.batchers.LoadOrStore(aset, mb.newShard(md))
|
||||||
|
if !loaded {
|
||||||
|
mb.size++
|
||||||
|
}
|
||||||
|
mb.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
b.(*shard).newItem <- data
|
||||||
if limit := mb.metadataLimit; limit != 0 && len(mb.batchers) >= limit {
|
return nil
|
||||||
return nil, errTooManyBatchers
|
|
||||||
}
|
|
||||||
|
|
||||||
// aset.ToSlice() returns the sorted, deduplicated,
|
|
||||||
// and name-downcased list of attributes.
|
|
||||||
b = mb.newBatcher(md)
|
|
||||||
mb.batchers[aset] = b
|
|
||||||
return b, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *singleBatcher) currentMetadataCardinality() int {
|
func (mb *multiShardBatcher) currentMetadataCardinality() int {
|
||||||
return 1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mb *multiBatcher) currentMetadataCardinality() int {
|
|
||||||
mb.lock.Lock()
|
mb.lock.Lock()
|
||||||
defer mb.lock.Unlock()
|
defer mb.lock.Unlock()
|
||||||
return len(mb.batchers)
|
return mb.size
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConsumeTraces implements TracesProcessor
|
// ConsumeTraces implements TracesProcessor
|
||||||
func (bp *batchProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
func (bp *batchProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
||||||
b, err := bp.findBatcher(ctx)
|
return bp.batcher.consume(ctx, td)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
b.newItem <- td
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConsumeMetrics implements MetricsProcessor
|
// ConsumeMetrics implements MetricsProcessor
|
||||||
func (bp *batchProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
|
func (bp *batchProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
|
||||||
b, err := bp.findBatcher(ctx)
|
return bp.batcher.consume(ctx, md)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
b.newItem <- md
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConsumeLogs implements LogsProcessor
|
// ConsumeLogs implements LogsProcessor
|
||||||
func (bp *batchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
|
func (bp *batchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
|
||||||
b, err := bp.findBatcher(ctx)
|
return bp.batcher.consume(ctx, ld)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
b.newItem <- ld
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout
|
// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue