Small cleanups in batchprocessor (#3013)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2021-04-26 13:52:33 -07:00 committed by GitHub
parent 38671a2e78
commit 25da8cfb98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 59 deletions

View File

@ -17,6 +17,7 @@ package batchprocessor
import (
"context"
"runtime"
"sync"
"time"
"go.opencensus.io/stats"
@ -38,21 +39,20 @@ import (
// - batch size reaches cfg.SendBatchSize
// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out.
type batchProcessor struct {
name string
logger *zap.Logger
telemetryLevel configtelemetry.Level
sendBatchSize uint32
logger *zap.Logger
exportCtx context.Context
timer *time.Timer
timeout time.Duration
sendBatchSize uint32
sendBatchMaxSize uint32
timer *time.Timer
done chan struct{}
newItem chan interface{}
batch batch
ctx context.Context
cancel context.CancelFunc
shutdownC chan struct{}
goroutines sync.WaitGroup
telemetryLevel configtelemetry.Level
}
type batch interface {
@ -76,22 +76,23 @@ var _ consumer.Traces = (*batchProcessor)(nil)
var _ consumer.Metrics = (*batchProcessor)(nil)
var _ consumer.Logs = (*batchProcessor)(nil)
func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batch batch, telemetryLevel configtelemetry.Level) *batchProcessor {
ctx, cancel := context.WithCancel(context.Background())
func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batch batch, telemetryLevel configtelemetry.Level) (*batchProcessor, error) {
exportCtx, err := tag.New(context.Background(), tag.Insert(processorTagKey, cfg.Name()))
if err != nil {
return nil, err
}
return &batchProcessor{
name: cfg.Name(),
logger: params.Logger,
exportCtx: exportCtx,
telemetryLevel: telemetryLevel,
sendBatchSize: cfg.SendBatchSize,
sendBatchMaxSize: cfg.SendBatchMaxSize,
timeout: cfg.Timeout,
done: make(chan struct{}, 1),
newItem: make(chan interface{}, runtime.NumCPU()),
batch: batch,
ctx: ctx,
cancel: cancel,
}
shutdownC: make(chan struct{}, 1),
}, nil
}
func (bp *batchProcessor) GetCapabilities() component.ProcessorCapabilities {
@ -100,24 +101,26 @@ func (bp *batchProcessor) GetCapabilities() component.ProcessorCapabilities {
// Start is invoked during service startup.
func (bp *batchProcessor) Start(context.Context, component.Host) error {
bp.goroutines.Add(1)
go bp.startProcessingCycle()
return nil
}
// Shutdown is invoked during service shutdown.
func (bp *batchProcessor) Shutdown(context.Context) error {
bp.cancel()
close(bp.shutdownC)
// Wait until current batch is drained.
<-bp.done
// Wait until all goroutines are done.
bp.goroutines.Wait()
return nil
}
func (bp *batchProcessor) startProcessingCycle() {
defer bp.goroutines.Done()
bp.timer = time.NewTimer(bp.timeout)
for {
select {
case <-bp.ctx.Done():
case <-bp.shutdownC:
DONE:
for {
select {
@ -133,8 +136,6 @@ func (bp *batchProcessor) startProcessingCycle() {
// make it cancellable using the context that Shutdown gets as a parameter
bp.sendItems(statTimeoutTriggerSend)
}
// Indicate that we finished draining.
close(bp.done)
return
case item := <-bp.newItem:
if item == nil {
@ -155,8 +156,7 @@ func (bp *batchProcessor) processItem(item interface{}) {
if td, ok := item.(pdata.Traces); ok {
itemCount := bp.batch.itemCount()
if itemCount+uint32(td.SpanCount()) > bp.sendBatchMaxSize {
tdRemainSize := splitTrace(int(bp.sendBatchSize-itemCount), td)
item = tdRemainSize
item = splitTrace(int(bp.sendBatchSize-itemCount), td)
go func() {
bp.newItem <- td
}()
@ -165,8 +165,7 @@ func (bp *batchProcessor) processItem(item interface{}) {
if td, ok := item.(pdata.Metrics); ok {
itemCount := bp.batch.itemCount()
if itemCount+uint32(td.MetricCount()) > bp.sendBatchMaxSize {
tdRemainSize := splitMetrics(int(bp.sendBatchSize-itemCount), td)
item = tdRemainSize
item = splitMetrics(int(bp.sendBatchSize-itemCount), td)
go func() {
bp.newItem <- td
}()
@ -175,8 +174,7 @@ func (bp *batchProcessor) processItem(item interface{}) {
if td, ok := item.(pdata.Logs); ok {
itemCount := bp.batch.itemCount()
if itemCount+uint32(td.LogRecordCount()) > bp.sendBatchMaxSize {
tdRemainSize := splitLogs(int(bp.sendBatchSize-itemCount), td)
item = tdRemainSize
item = splitLogs(int(bp.sendBatchSize-itemCount), td)
go func() {
bp.newItem <- td
}()
@ -186,26 +184,31 @@ func (bp *batchProcessor) processItem(item interface{}) {
bp.batch.add(item)
if bp.batch.itemCount() >= bp.sendBatchSize {
bp.timer.Stop()
bp.stopTimer()
bp.sendItems(statBatchSizeTriggerSend)
bp.resetTimer()
}
}
func (bp *batchProcessor) stopTimer() {
if !bp.timer.Stop() {
<-bp.timer.C
}
}
func (bp *batchProcessor) resetTimer() {
bp.timer.Reset(bp.timeout)
}
func (bp *batchProcessor) sendItems(measure *stats.Int64Measure) {
func (bp *batchProcessor) sendItems(triggerMeasure *stats.Int64Measure) {
// Add that it came form the trace pipeline?
statsTags := []tag.Mutator{tag.Insert(processorTagKey, bp.name)}
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1), statBatchSendSize.M(int64(bp.batch.itemCount())))
stats.Record(bp.exportCtx, triggerMeasure.M(1), statBatchSendSize.M(int64(bp.batch.itemCount())))
if bp.telemetryLevel == configtelemetry.LevelDetailed {
_ = stats.RecordWithTags(context.Background(), statsTags, statBatchSendSizeBytes.M(int64(bp.batch.size())))
stats.Record(bp.exportCtx, statBatchSendSizeBytes.M(int64(bp.batch.size())))
}
if err := bp.batch.export(context.Background()); err != nil {
if err := bp.batch.export(bp.exportCtx); err != nil {
bp.logger.Warn("Sender failed", zap.Error(err))
}
bp.batch.reset()
@ -231,18 +234,18 @@ func (bp *batchProcessor) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
}
// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout
func newBatchTracesProcessor(params component.ProcessorCreateParams, trace consumer.Traces, cfg *Config, telemetryLevel configtelemetry.Level) *batchProcessor {
return newBatchProcessor(params, cfg, newBatchTraces(trace), telemetryLevel)
func newBatchTracesProcessor(params component.ProcessorCreateParams, next consumer.Traces, cfg *Config, telemetryLevel configtelemetry.Level) (*batchProcessor, error) {
return newBatchProcessor(params, cfg, newBatchTraces(next), telemetryLevel)
}
// newBatchMetricsProcessor creates a new batch processor that batches metrics by size or with timeout
func newBatchMetricsProcessor(params component.ProcessorCreateParams, metrics consumer.Metrics, cfg *Config, telemetryLevel configtelemetry.Level) *batchProcessor {
return newBatchProcessor(params, cfg, newBatchMetrics(metrics), telemetryLevel)
func newBatchMetricsProcessor(params component.ProcessorCreateParams, next consumer.Metrics, cfg *Config, telemetryLevel configtelemetry.Level) (*batchProcessor, error) {
return newBatchProcessor(params, cfg, newBatchMetrics(next), telemetryLevel)
}
// newBatchLogsProcessor creates a new batch processor that batches logs by size or with timeout
func newBatchLogsProcessor(params component.ProcessorCreateParams, logs consumer.Logs, cfg *Config, telemetryLevel configtelemetry.Level) *batchProcessor {
return newBatchProcessor(params, cfg, newBatchLogs(logs), telemetryLevel)
func newBatchLogsProcessor(params component.ProcessorCreateParams, next consumer.Logs, cfg *Config, telemetryLevel configtelemetry.Level) (*batchProcessor, error) {
return newBatchProcessor(params, cfg, newBatchLogs(next), telemetryLevel)
}
type batchTraces struct {

View File

@ -39,7 +39,8 @@ func TestBatchProcessorSpansDelivered(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelDetailed)
batcher, err := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
requestCount := 1000
@ -80,7 +81,8 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
cfg.SendBatchSize = 128
cfg.SendBatchMaxSize = 128
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelBasic)
batcher, err := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelBasic)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
requestCount := 1000
@ -127,7 +129,8 @@ func TestBatchProcessorSentBySize(t *testing.T) {
cfg.SendBatchSize = uint32(sendBatchSize)
cfg.Timeout = 500 * time.Millisecond
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelDetailed)
batcher, err := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
requestCount := 100
@ -189,7 +192,8 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {
spansPerRequest := 10
start := time.Now()
batcher := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelDetailed)
batcher, err := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
for requestNum := 0; requestNum < requestCount; requestNum++ {
@ -235,7 +239,8 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {
sink := new(consumertest.TracesSink)
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchTracesProcessor(creationParams, sink, &cfg, configtelemetry.LevelDetailed)
batcher, err := newBatchTracesProcessor(creationParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
requestCount := 10
@ -265,7 +270,8 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
sink := new(consumertest.MetricsSink)
createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
batcher, err := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
metricDataSlice := make([]pdata.Metrics, 0, requestCount)
@ -317,7 +323,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
sink := new(consumertest.MetricsSink)
createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
batcher, err := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
start := time.Now()
@ -373,7 +380,8 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {
sink := new(consumertest.MetricsSink)
createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
batcher, err := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
start := time.Now()
@ -421,7 +429,8 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) {
sink := new(consumertest.MetricsSink)
createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
batcher, err := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
for requestNum := 0; requestNum < requestCount; requestNum++ {
@ -507,7 +516,8 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
sink := new(consumertest.LogsSink)
createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
batcher, err := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
logDataSlice := make([]pdata.Logs, 0, requestCount)
@ -559,7 +569,8 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) {
sink := new(consumertest.LogsSink)
createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
batcher, err := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
start := time.Now()
@ -615,7 +626,8 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {
sink := new(consumertest.LogsSink)
createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
batcher, err := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
start := time.Now()
@ -663,7 +675,8 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) {
sink := new(consumertest.LogsSink)
createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
batcher, err := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
for requestNum := 0; requestNum < requestCount; requestNum++ {

View File

@ -57,9 +57,8 @@ func createTracesProcessor(
cfg config.Processor,
nextConsumer consumer.Traces,
) (component.TracesProcessor, error) {
oCfg := cfg.(*Config)
level := configtelemetry.GetMetricsLevelFlagValue()
return newBatchTracesProcessor(params, nextConsumer, oCfg, level), nil
return newBatchTracesProcessor(params, nextConsumer, cfg.(*Config), level)
}
func createMetricsProcessor(
@ -68,9 +67,8 @@ func createMetricsProcessor(
cfg config.Processor,
nextConsumer consumer.Metrics,
) (component.MetricsProcessor, error) {
oCfg := cfg.(*Config)
level := configtelemetry.GetMetricsLevelFlagValue()
return newBatchMetricsProcessor(params, nextConsumer, oCfg, level), nil
return newBatchMetricsProcessor(params, nextConsumer, cfg.(*Config), level)
}
func createLogsProcessor(
@ -79,7 +77,6 @@ func createLogsProcessor(
cfg config.Processor,
nextConsumer consumer.Logs,
) (component.LogsProcessor, error) {
oCfg := cfg.(*Config)
level := configtelemetry.GetMetricsLevelFlagValue()
return newBatchLogsProcessor(params, nextConsumer, oCfg, level), nil
return newBatchLogsProcessor(params, nextConsumer, cfg.(*Config), level)
}