opentelemetry-collector/processor/memorylimiterprocessor/factory.go

116 lines
3.6 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//go:generate mdatagen metadata.yaml
package memorylimiterprocessor // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor"
import (
"context"
"sync"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadata"
"go.opentelemetry.io/collector/processor/processorhelper"
)
var processorCapabilities = consumer.Capabilities{MutatesData: false}
type factory struct {
// memoryLimiters stores memoryLimiter instances with unique configs that multiple processors can reuse.
// This avoids running multiple memory checks (ie: GC) for every processor using the same processor config.
memoryLimiters map[component.Config]*memoryLimiterProcessor
lock sync.Mutex
}
// NewFactory returns a new factory for the Memory Limiter processor.
func NewFactory() processor.Factory {
f := &factory{
memoryLimiters: map[component.Config]*memoryLimiterProcessor{},
}
return processor.NewFactory(
metadata.Type,
createDefaultConfig,
processor.WithTraces(f.createTraces, metadata.TracesStability),
processor.WithMetrics(f.createMetrics, metadata.MetricsStability),
processor.WithLogs(f.createLogs, metadata.LogsStability))
}
// CreateDefaultConfig creates the default configuration for processor. Notice
// that the default configuration is expected to fail for this processor.
func createDefaultConfig() component.Config {
return &Config{}
}
func (f *factory) createTraces(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Traces,
) (processor.Traces, error) {
memLimiter, err := f.getMemoryLimiter(set, cfg)
if err != nil {
return nil, err
}
return processorhelper.NewTraces(ctx, set, cfg, nextConsumer,
memLimiter.processTraces,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithStart(memLimiter.start),
processorhelper.WithShutdown(memLimiter.shutdown))
}
func (f *factory) createMetrics(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Metrics,
) (processor.Metrics, error) {
memLimiter, err := f.getMemoryLimiter(set, cfg)
if err != nil {
return nil, err
}
return processorhelper.NewMetrics(ctx, set, cfg, nextConsumer,
memLimiter.processMetrics,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithStart(memLimiter.start),
processorhelper.WithShutdown(memLimiter.shutdown))
}
func (f *factory) createLogs(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Logs,
) (processor.Logs, error) {
memLimiter, err := f.getMemoryLimiter(set, cfg)
if err != nil {
return nil, err
}
return processorhelper.NewLogs(ctx, set, cfg, nextConsumer,
memLimiter.processLogs,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithStart(memLimiter.start),
processorhelper.WithShutdown(memLimiter.shutdown))
}
// getMemoryLimiter checks if we have a cached memoryLimiter with a specific config,
// otherwise initialize and add one to the store.
func (f *factory) getMemoryLimiter(set processor.Settings, cfg component.Config) (*memoryLimiterProcessor, error) {
f.lock.Lock()
defer f.lock.Unlock()
if memLimiter, ok := f.memoryLimiters[cfg]; ok {
return memLimiter, nil
}
memLimiter, err := newMemoryLimiterProcessor(set, cfg.(*Config))
if err != nil {
return nil, err
}
f.memoryLimiters[cfg] = memLimiter
return memLimiter, nil
}