[chore] Small nits in batch processor, use generics to avoid type conversions (#11501)
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
parent
4884781299
commit
8f20a07847
|
|
@ -37,7 +37,7 @@ var errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batcher
|
|||
// Batches are sent out with any of the following conditions:
|
||||
// - batch size reaches cfg.SendBatchSize
|
||||
// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out.
|
||||
type batchProcessor struct {
|
||||
type batchProcessor[T any] struct {
|
||||
logger *zap.Logger
|
||||
timeout time.Duration
|
||||
sendBatchSize int
|
||||
|
|
@ -45,7 +45,7 @@ type batchProcessor struct {
|
|||
|
||||
// batchFunc is a factory for new batch objects corresponding
|
||||
// with the appropriate signal.
|
||||
batchFunc func() batch
|
||||
batchFunc func() batch[T]
|
||||
|
||||
shutdownC chan struct{}
|
||||
goroutines sync.WaitGroup
|
||||
|
|
@ -53,16 +53,16 @@ type batchProcessor struct {
|
|||
telemetry *batchProcessorTelemetry
|
||||
|
||||
// batcher will be either *singletonBatcher or *multiBatcher
|
||||
batcher batcher
|
||||
batcher batcher[T]
|
||||
}
|
||||
|
||||
// batcher is describes a *singletonBatcher or *multiBatcher.
|
||||
type batcher interface {
|
||||
type batcher[T any] interface {
|
||||
// start initializes background resources used by this batcher.
|
||||
start(ctx context.Context) error
|
||||
|
||||
// consume incorporates a new item of data into the pending batch.
|
||||
consume(ctx context.Context, data any) error
|
||||
consume(ctx context.Context, data T) error
|
||||
|
||||
// currentMetadataCardinality returns the number of shards.
|
||||
currentMetadataCardinality() int
|
||||
|
|
@ -71,10 +71,10 @@ type batcher interface {
|
|||
// shard is a single instance of the batch logic. When metadata
|
||||
// keys are in use, one of these is created per distinct combination
|
||||
// of values.
|
||||
type shard struct {
|
||||
type shard[T any] struct {
|
||||
// processor refers to this processor, for access to common
|
||||
// configuration.
|
||||
processor *batchProcessor
|
||||
processor *batchProcessor[T]
|
||||
|
||||
// exportCtx is a context with the metadata key-values
|
||||
// corresponding with this shard set.
|
||||
|
|
@ -84,44 +84,40 @@ type shard struct {
|
|||
timer *time.Timer
|
||||
|
||||
// newItem is used to receive data items from producers.
|
||||
newItem chan any
|
||||
newItem chan T
|
||||
|
||||
// batch is an in-flight data item containing one of the
|
||||
// underlying data types.
|
||||
batch batch
|
||||
batch batch[T]
|
||||
}
|
||||
|
||||
// batch is an interface generalizing the individual signal types.
|
||||
type batch interface {
|
||||
type batch[T any] interface {
|
||||
// export the current batch
|
||||
export(ctx context.Context, req any) error
|
||||
export(ctx context.Context, req T) error
|
||||
|
||||
// splitBatch returns a full request built from pending items.
|
||||
splitBatch(ctx context.Context, sendBatchMaxSize int) (sentBatchSize int, req any)
|
||||
splitBatch(ctx context.Context, sendBatchMaxSize int) (sentBatchSize int, req T)
|
||||
|
||||
// itemCount returns the size of the current batch
|
||||
itemCount() int
|
||||
|
||||
// add item to the current batch
|
||||
add(item any)
|
||||
add(item T)
|
||||
|
||||
// sizeBytes counts the OTLP encoding size of the batch
|
||||
sizeBytes(item any) int
|
||||
sizeBytes(item T) int
|
||||
}
|
||||
|
||||
var _ consumer.Traces = (*batchProcessor)(nil)
|
||||
var _ consumer.Metrics = (*batchProcessor)(nil)
|
||||
var _ consumer.Logs = (*batchProcessor)(nil)
|
||||
|
||||
// newBatchProcessor returns a new batch processor component.
|
||||
func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() batch) (*batchProcessor, error) {
|
||||
func newBatchProcessor[T any](set processor.Settings, cfg *Config, batchFunc func() batch[T]) (*batchProcessor[T], error) {
|
||||
// use lower-case, to be consistent with http/2 headers.
|
||||
mks := make([]string, len(cfg.MetadataKeys))
|
||||
for i, k := range cfg.MetadataKeys {
|
||||
mks[i] = strings.ToLower(k)
|
||||
}
|
||||
sort.Strings(mks)
|
||||
bp := &batchProcessor{
|
||||
bp := &batchProcessor[T]{
|
||||
logger: set.Logger,
|
||||
|
||||
sendBatchSize: int(cfg.SendBatchSize),
|
||||
|
|
@ -131,11 +127,11 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
|
|||
shutdownC: make(chan struct{}, 1),
|
||||
}
|
||||
if len(mks) == 0 {
|
||||
bp.batcher = &singleShardBatcher{
|
||||
bp.batcher = &singleShardBatcher[T]{
|
||||
processor: bp,
|
||||
}
|
||||
} else {
|
||||
bp.batcher = &multiShardBatcher{
|
||||
bp.batcher = &multiShardBatcher[T]{
|
||||
metadataKeys: mks,
|
||||
metadataLimit: int(cfg.MetadataCardinalityLimit),
|
||||
processor: bp,
|
||||
|
|
@ -152,30 +148,30 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
|
|||
}
|
||||
|
||||
// newShard gets or creates a batcher corresponding with attrs.
|
||||
func (bp *batchProcessor) newShard(md map[string][]string) *shard {
|
||||
func (bp *batchProcessor[T]) newShard(md map[string][]string) *shard[T] {
|
||||
exportCtx := client.NewContext(context.Background(), client.Info{
|
||||
Metadata: client.NewMetadata(md),
|
||||
})
|
||||
b := &shard{
|
||||
b := &shard[T]{
|
||||
processor: bp,
|
||||
newItem: make(chan any, runtime.NumCPU()),
|
||||
newItem: make(chan T, runtime.NumCPU()),
|
||||
exportCtx: exportCtx,
|
||||
batch: bp.batchFunc(),
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (bp *batchProcessor) Capabilities() consumer.Capabilities {
|
||||
func (bp *batchProcessor[T]) Capabilities() consumer.Capabilities {
|
||||
return consumer.Capabilities{MutatesData: true}
|
||||
}
|
||||
|
||||
// Start is invoked during service startup.
|
||||
func (bp *batchProcessor) Start(ctx context.Context, _ component.Host) error {
|
||||
func (bp *batchProcessor[T]) Start(ctx context.Context, _ component.Host) error {
|
||||
return bp.batcher.start(ctx)
|
||||
}
|
||||
|
||||
// Shutdown is invoked during service shutdown.
|
||||
func (bp *batchProcessor) Shutdown(context.Context) error {
|
||||
func (bp *batchProcessor[T]) Shutdown(context.Context) error {
|
||||
close(bp.shutdownC)
|
||||
|
||||
// Wait until all goroutines are done.
|
||||
|
|
@ -183,12 +179,12 @@ func (bp *batchProcessor) Shutdown(context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (b *shard) start() {
|
||||
func (b *shard[T]) start() {
|
||||
b.processor.goroutines.Add(1)
|
||||
go b.startLoop()
|
||||
}
|
||||
|
||||
func (b *shard) startLoop() {
|
||||
func (b *shard[T]) startLoop() {
|
||||
defer b.processor.goroutines.Done()
|
||||
|
||||
// timerCh ensures we only block when there is a
|
||||
|
|
@ -218,9 +214,6 @@ func (b *shard) startLoop() {
|
|||
}
|
||||
return
|
||||
case item := <-b.newItem:
|
||||
if item == nil {
|
||||
continue
|
||||
}
|
||||
b.processItem(item)
|
||||
case <-timerCh:
|
||||
if b.batch.itemCount() > 0 {
|
||||
|
|
@ -231,7 +224,7 @@ func (b *shard) startLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
func (b *shard) processItem(item any) {
|
||||
func (b *shard[T]) processItem(item T) {
|
||||
b.batch.add(item)
|
||||
sent := false
|
||||
for b.batch.itemCount() > 0 && (!b.hasTimer() || b.batch.itemCount() >= b.processor.sendBatchSize) {
|
||||
|
|
@ -245,23 +238,23 @@ func (b *shard) processItem(item any) {
|
|||
}
|
||||
}
|
||||
|
||||
func (b *shard) hasTimer() bool {
|
||||
func (b *shard[T]) hasTimer() bool {
|
||||
return b.timer != nil
|
||||
}
|
||||
|
||||
func (b *shard) stopTimer() {
|
||||
func (b *shard[T]) stopTimer() {
|
||||
if b.hasTimer() && !b.timer.Stop() {
|
||||
<-b.timer.C
|
||||
}
|
||||
}
|
||||
|
||||
func (b *shard) resetTimer() {
|
||||
func (b *shard[T]) resetTimer() {
|
||||
if b.hasTimer() {
|
||||
b.timer.Reset(b.processor.timeout)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *shard) sendItems(trigger trigger) {
|
||||
func (b *shard[T]) sendItems(trigger trigger) {
|
||||
sent, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize)
|
||||
|
||||
err := b.batch.export(b.exportCtx, req)
|
||||
|
|
@ -278,28 +271,28 @@ func (b *shard) sendItems(trigger trigger) {
|
|||
|
||||
// singleShardBatcher is used when metadataKeys is empty, to avoid the
|
||||
// additional lock and map operations used in multiBatcher.
|
||||
type singleShardBatcher struct {
|
||||
processor *batchProcessor
|
||||
single *shard
|
||||
type singleShardBatcher[T any] struct {
|
||||
processor *batchProcessor[T]
|
||||
single *shard[T]
|
||||
}
|
||||
|
||||
func (sb *singleShardBatcher) start(context.Context) error {
|
||||
func (sb *singleShardBatcher[T]) start(context.Context) error {
|
||||
sb.single = sb.processor.newShard(nil)
|
||||
sb.single.start()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sb *singleShardBatcher) consume(_ context.Context, data any) error {
|
||||
func (sb *singleShardBatcher[T]) consume(_ context.Context, data T) error {
|
||||
sb.single.newItem <- data
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sb *singleShardBatcher) currentMetadataCardinality() int {
|
||||
func (sb *singleShardBatcher[T]) currentMetadataCardinality() int {
|
||||
return 1
|
||||
}
|
||||
|
||||
// multiBatcher is used when metadataKeys is not empty.
|
||||
type multiShardBatcher struct {
|
||||
// multiShardBatcher is used when metadataKeys is not empty.
|
||||
type multiShardBatcher[T any] struct {
|
||||
// metadataKeys is the configured list of metadata keys. When
|
||||
// empty, the `singleton` batcher is used. When non-empty,
|
||||
// each distinct combination of metadata keys and values
|
||||
|
|
@ -309,7 +302,7 @@ type multiShardBatcher struct {
|
|||
// metadataLimit is the limiting size of the batchers map.
|
||||
metadataLimit int
|
||||
|
||||
processor *batchProcessor
|
||||
processor *batchProcessor[T]
|
||||
batchers sync.Map
|
||||
|
||||
// Guards the size and the storing logic to ensure no more than limit items are stored.
|
||||
|
|
@ -318,11 +311,11 @@ type multiShardBatcher struct {
|
|||
size int
|
||||
}
|
||||
|
||||
func (mb *multiShardBatcher) start(context.Context) error {
|
||||
func (mb *multiShardBatcher[T]) start(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
|
||||
func (mb *multiShardBatcher[T]) consume(ctx context.Context, data T) error {
|
||||
// Get each metadata key value, form the corresponding
|
||||
// attribute set for use as a map lookup key.
|
||||
info := client.FromContext(ctx)
|
||||
|
|
@ -356,49 +349,72 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
|
|||
b, loaded = mb.batchers.LoadOrStore(aset, mb.processor.newShard(md))
|
||||
if !loaded {
|
||||
// Start the goroutine only if we added the object to the map, otherwise is already started.
|
||||
b.(*shard).start()
|
||||
b.(*shard[T]).start()
|
||||
mb.size++
|
||||
}
|
||||
mb.lock.Unlock()
|
||||
}
|
||||
b.(*shard).newItem <- data
|
||||
b.(*shard[T]).newItem <- data
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mb *multiShardBatcher) currentMetadataCardinality() int {
|
||||
func (mb *multiShardBatcher[T]) currentMetadataCardinality() int {
|
||||
mb.lock.Lock()
|
||||
defer mb.lock.Unlock()
|
||||
return mb.size
|
||||
}
|
||||
|
||||
// ConsumeTraces implements processor.Traces
|
||||
func (bp *batchProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
||||
return bp.batcher.consume(ctx, td)
|
||||
type tracesBatchProcessor struct {
|
||||
*batchProcessor[ptrace.Traces]
|
||||
}
|
||||
|
||||
// newTracesBatchProcessor creates a new batch processor that batches traces by size or with timeout
|
||||
func newTracesBatchProcessor(set processor.Settings, next consumer.Traces, cfg *Config) (processor.Traces, error) {
|
||||
bp, err := newBatchProcessor(set, cfg, func() batch[ptrace.Traces] { return newBatchTraces(next) })
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &tracesBatchProcessor{batchProcessor: bp}, nil
|
||||
}
|
||||
|
||||
func (t *tracesBatchProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
||||
return t.batcher.consume(ctx, td)
|
||||
}
|
||||
|
||||
type metricsBatchProcessor struct {
|
||||
*batchProcessor[pmetric.Metrics]
|
||||
}
|
||||
|
||||
// newMetricsBatchProcessor creates a new batch processor that batches metrics by size or with timeout
|
||||
func newMetricsBatchProcessor(set processor.Settings, next consumer.Metrics, cfg *Config) (processor.Metrics, error) {
|
||||
bp, err := newBatchProcessor(set, cfg, func() batch[pmetric.Metrics] { return newMetricsBatch(next) })
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &metricsBatchProcessor{batchProcessor: bp}, nil
|
||||
}
|
||||
|
||||
// ConsumeMetrics implements processor.Metrics
|
||||
func (bp *batchProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
|
||||
return bp.batcher.consume(ctx, md)
|
||||
func (m *metricsBatchProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
|
||||
return m.batcher.consume(ctx, md)
|
||||
}
|
||||
|
||||
type logsBatchProcessor struct {
|
||||
*batchProcessor[plog.Logs]
|
||||
}
|
||||
|
||||
// newLogsBatchProcessor creates a new batch processor that batches logs by size or with timeout
|
||||
func newLogsBatchProcessor(set processor.Settings, next consumer.Logs, cfg *Config) (processor.Logs, error) {
|
||||
bp, err := newBatchProcessor(set, cfg, func() batch[plog.Logs] { return newBatchLogs(next) })
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &logsBatchProcessor{batchProcessor: bp}, nil
|
||||
}
|
||||
|
||||
// ConsumeLogs implements processor.Logs
|
||||
func (bp *batchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
|
||||
return bp.batcher.consume(ctx, ld)
|
||||
}
|
||||
|
||||
// newBatchTraces creates a new batch processor that batches traces by size or with timeout
|
||||
func newBatchTracesProcessor(set processor.Settings, next consumer.Traces, cfg *Config) (*batchProcessor, error) {
|
||||
return newBatchProcessor(set, cfg, func() batch { return newBatchTraces(next) })
|
||||
}
|
||||
|
||||
// newBatchMetricsProcessor creates a new batch processor that batches metrics by size or with timeout
|
||||
func newBatchMetricsProcessor(set processor.Settings, next consumer.Metrics, cfg *Config) (*batchProcessor, error) {
|
||||
return newBatchProcessor(set, cfg, func() batch { return newBatchMetrics(next) })
|
||||
}
|
||||
|
||||
// newBatchLogsProcessor creates a new batch processor that batches logs by size or with timeout
|
||||
func newBatchLogsProcessor(set processor.Settings, next consumer.Logs, cfg *Config) (*batchProcessor, error) {
|
||||
return newBatchProcessor(set, cfg, func() batch { return newBatchLogs(next) })
|
||||
func (l *logsBatchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
|
||||
return l.batcher.consume(ctx, ld)
|
||||
}
|
||||
|
||||
type batchTraces struct {
|
||||
|
|
@ -413,8 +429,7 @@ func newBatchTraces(nextConsumer consumer.Traces) *batchTraces {
|
|||
}
|
||||
|
||||
// add updates current batchTraces by adding new TraceData object
|
||||
func (bt *batchTraces) add(item any) {
|
||||
td := item.(ptrace.Traces)
|
||||
func (bt *batchTraces) add(td ptrace.Traces) {
|
||||
newSpanCount := td.SpanCount()
|
||||
if newSpanCount == 0 {
|
||||
return
|
||||
|
|
@ -424,29 +439,28 @@ func (bt *batchTraces) add(item any) {
|
|||
td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans())
|
||||
}
|
||||
|
||||
func (bt *batchTraces) sizeBytes(data any) int {
|
||||
return bt.sizer.TracesSize(data.(ptrace.Traces))
|
||||
func (bt *batchTraces) sizeBytes(td ptrace.Traces) int {
|
||||
return bt.sizer.TracesSize(td)
|
||||
}
|
||||
|
||||
func (bt *batchTraces) export(ctx context.Context, req any) error {
|
||||
td := req.(ptrace.Traces)
|
||||
func (bt *batchTraces) export(ctx context.Context, td ptrace.Traces) error {
|
||||
return bt.nextConsumer.ConsumeTraces(ctx, td)
|
||||
}
|
||||
|
||||
func (bt *batchTraces) splitBatch(_ context.Context, sendBatchMaxSize int) (int, any) {
|
||||
var req ptrace.Traces
|
||||
func (bt *batchTraces) splitBatch(_ context.Context, sendBatchMaxSize int) (int, ptrace.Traces) {
|
||||
var td ptrace.Traces
|
||||
var sent int
|
||||
if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize {
|
||||
req = splitTraces(sendBatchMaxSize, bt.traceData)
|
||||
td = splitTraces(sendBatchMaxSize, bt.traceData)
|
||||
bt.spanCount -= sendBatchMaxSize
|
||||
sent = sendBatchMaxSize
|
||||
} else {
|
||||
req = bt.traceData
|
||||
td = bt.traceData
|
||||
sent = bt.spanCount
|
||||
bt.traceData = ptrace.NewTraces()
|
||||
bt.spanCount = 0
|
||||
}
|
||||
return sent, req
|
||||
return sent, td
|
||||
}
|
||||
|
||||
func (bt *batchTraces) itemCount() int {
|
||||
|
|
@ -460,43 +474,40 @@ type batchMetrics struct {
|
|||
sizer pmetric.Sizer
|
||||
}
|
||||
|
||||
func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
|
||||
func newMetricsBatch(nextConsumer consumer.Metrics) *batchMetrics {
|
||||
return &batchMetrics{nextConsumer: nextConsumer, metricData: pmetric.NewMetrics(), sizer: &pmetric.ProtoMarshaler{}}
|
||||
}
|
||||
|
||||
func (bm *batchMetrics) sizeBytes(data any) int {
|
||||
return bm.sizer.MetricsSize(data.(pmetric.Metrics))
|
||||
func (bm *batchMetrics) sizeBytes(md pmetric.Metrics) int {
|
||||
return bm.sizer.MetricsSize(md)
|
||||
}
|
||||
|
||||
func (bm *batchMetrics) export(ctx context.Context, req any) error {
|
||||
md := req.(pmetric.Metrics)
|
||||
func (bm *batchMetrics) export(ctx context.Context, md pmetric.Metrics) error {
|
||||
return bm.nextConsumer.ConsumeMetrics(ctx, md)
|
||||
}
|
||||
|
||||
func (bm *batchMetrics) splitBatch(_ context.Context, sendBatchMaxSize int) (int, any) {
|
||||
var req pmetric.Metrics
|
||||
func (bm *batchMetrics) splitBatch(_ context.Context, sendBatchMaxSize int) (int, pmetric.Metrics) {
|
||||
var md pmetric.Metrics
|
||||
var sent int
|
||||
if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize {
|
||||
req = splitMetrics(sendBatchMaxSize, bm.metricData)
|
||||
md = splitMetrics(sendBatchMaxSize, bm.metricData)
|
||||
bm.dataPointCount -= sendBatchMaxSize
|
||||
sent = sendBatchMaxSize
|
||||
} else {
|
||||
req = bm.metricData
|
||||
md = bm.metricData
|
||||
sent = bm.dataPointCount
|
||||
bm.metricData = pmetric.NewMetrics()
|
||||
bm.dataPointCount = 0
|
||||
}
|
||||
|
||||
return sent, req
|
||||
return sent, md
|
||||
}
|
||||
|
||||
func (bm *batchMetrics) itemCount() int {
|
||||
return bm.dataPointCount
|
||||
}
|
||||
|
||||
func (bm *batchMetrics) add(item any) {
|
||||
md := item.(pmetric.Metrics)
|
||||
|
||||
func (bm *batchMetrics) add(md pmetric.Metrics) {
|
||||
newDataPointCount := md.DataPointCount()
|
||||
if newDataPointCount == 0 {
|
||||
return
|
||||
|
|
@ -516,39 +527,36 @@ func newBatchLogs(nextConsumer consumer.Logs) *batchLogs {
|
|||
return &batchLogs{nextConsumer: nextConsumer, logData: plog.NewLogs(), sizer: &plog.ProtoMarshaler{}}
|
||||
}
|
||||
|
||||
func (bl *batchLogs) sizeBytes(data any) int {
|
||||
return bl.sizer.LogsSize(data.(plog.Logs))
|
||||
func (bl *batchLogs) sizeBytes(ld plog.Logs) int {
|
||||
return bl.sizer.LogsSize(ld)
|
||||
}
|
||||
|
||||
func (bl *batchLogs) export(ctx context.Context, req any) error {
|
||||
ld := req.(plog.Logs)
|
||||
func (bl *batchLogs) export(ctx context.Context, ld plog.Logs) error {
|
||||
return bl.nextConsumer.ConsumeLogs(ctx, ld)
|
||||
}
|
||||
|
||||
func (bl *batchLogs) splitBatch(_ context.Context, sendBatchMaxSize int) (int, any) {
|
||||
var req plog.Logs
|
||||
func (bl *batchLogs) splitBatch(_ context.Context, sendBatchMaxSize int) (int, plog.Logs) {
|
||||
var ld plog.Logs
|
||||
var sent int
|
||||
|
||||
if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize {
|
||||
req = splitLogs(sendBatchMaxSize, bl.logData)
|
||||
ld = splitLogs(sendBatchMaxSize, bl.logData)
|
||||
bl.logCount -= sendBatchMaxSize
|
||||
sent = sendBatchMaxSize
|
||||
} else {
|
||||
req = bl.logData
|
||||
ld = bl.logData
|
||||
sent = bl.logCount
|
||||
bl.logData = plog.NewLogs()
|
||||
bl.logCount = 0
|
||||
}
|
||||
return sent, req
|
||||
return sent, ld
|
||||
}
|
||||
|
||||
func (bl *batchLogs) itemCount() int {
|
||||
return bl.logCount
|
||||
}
|
||||
|
||||
func (bl *batchLogs) add(item any) {
|
||||
ld := item.(plog.Logs)
|
||||
|
||||
func (bl *batchLogs) add(ld plog.Logs) {
|
||||
newLogsCount := ld.LogRecordCount()
|
||||
if newLogsCount == 0 {
|
||||
return
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ func TestBatchProcessorSpansDelivered(t *testing.T) {
|
|||
cfg.SendBatchSize = 128
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -129,7 +129,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
|
|||
cfg.SendBatchMaxSize = 130
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -183,7 +183,7 @@ func TestBatchProcessorSentBySize(t *testing.T) {
|
|||
cfg.Timeout = 500 * time.Millisecond
|
||||
creationSet := tel.NewSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -304,7 +304,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) {
|
|||
cfg.Timeout = 500 * time.Millisecond
|
||||
creationSet := tel.NewSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -442,7 +442,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {
|
|||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -489,7 +489,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {
|
|||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchTracesProcessor(creationSet, sink, &cfg)
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, &cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -520,7 +520,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
|
|||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
|
||||
batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -576,7 +576,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) {
|
|||
|
||||
creationSet := tel.NewSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
|
||||
batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -686,7 +686,7 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {
|
|||
dataPointsPerMetric := 2
|
||||
sendBatchMaxSize := 99
|
||||
|
||||
batchMetrics := newBatchMetrics(sink)
|
||||
batchMetrics := newMetricsBatch(sink)
|
||||
md := testdata.GenerateMetrics(metricsCount)
|
||||
|
||||
batchMetrics.add(md)
|
||||
|
|
@ -710,7 +710,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {
|
|||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
|
||||
batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -759,7 +759,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) {
|
|||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
|
||||
batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -858,7 +858,7 @@ func runMetricsProcessorBenchmark(b *testing.B, cfg Config) {
|
|||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
metricsPerRequest := 1000
|
||||
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
|
||||
batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg)
|
||||
require.NoError(b, err)
|
||||
require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -905,7 +905,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
|
|||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
|
||||
batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -959,7 +959,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) {
|
|||
|
||||
creationSet := tel.NewSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
|
||||
batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -1073,7 +1073,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {
|
|||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
|
||||
batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -1122,7 +1122,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) {
|
|||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
|
||||
batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -1201,7 +1201,7 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) {
|
|||
cfg.MetadataKeys = []string{"token1", "token2"}
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -1293,7 +1293,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) {
|
|||
cfg.MetadataKeys = []string{"token"}
|
||||
cfg.MetadataCardinalityLimit = cardLimit
|
||||
creationSet := processortest.NewNopSettings()
|
||||
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
|
|
@ -1336,7 +1336,7 @@ func TestBatchZeroConfig(t *testing.T) {
|
|||
sink := new(consumertest.LogsSink)
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
|
||||
batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
defer func() { require.NoError(t, batcher.Shutdown(context.Background())) }()
|
||||
|
|
@ -1377,7 +1377,7 @@ func TestBatchSplitOnly(t *testing.T) {
|
|||
sink := new(consumertest.LogsSink)
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
|
||||
batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
defer func() { require.NoError(t, batcher.Shutdown(context.Background())) }()
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ func createTraces(
|
|||
cfg component.Config,
|
||||
nextConsumer consumer.Traces,
|
||||
) (processor.Traces, error) {
|
||||
return newBatchTracesProcessor(set, nextConsumer, cfg.(*Config))
|
||||
return newTracesBatchProcessor(set, nextConsumer, cfg.(*Config))
|
||||
}
|
||||
|
||||
func createMetrics(
|
||||
|
|
@ -58,7 +58,7 @@ func createMetrics(
|
|||
cfg component.Config,
|
||||
nextConsumer consumer.Metrics,
|
||||
) (processor.Metrics, error) {
|
||||
return newBatchMetricsProcessor(set, nextConsumer, cfg.(*Config))
|
||||
return newMetricsBatchProcessor(set, nextConsumer, cfg.(*Config))
|
||||
}
|
||||
|
||||
func createLogs(
|
||||
|
|
@ -67,5 +67,5 @@ func createLogs(
|
|||
cfg component.Config,
|
||||
nextConsumer consumer.Logs,
|
||||
) (processor.Logs, error) {
|
||||
return newBatchLogsProcessor(set, nextConsumer, cfg.(*Config))
|
||||
return newLogsBatchProcessor(set, nextConsumer, cfg.(*Config))
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue