154 lines
4.8 KiB
Go
154 lines
4.8 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
//go:generate mdatagen metadata.yaml
|
|
|
|
package memorylimiterprocessor // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor"
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"go.opentelemetry.io/collector/component"
|
|
"go.opentelemetry.io/collector/consumer"
|
|
"go.opentelemetry.io/collector/consumer/xconsumer"
|
|
"go.opentelemetry.io/collector/internal/memorylimiter"
|
|
"go.opentelemetry.io/collector/internal/telemetry"
|
|
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
|
|
"go.opentelemetry.io/collector/processor"
|
|
"go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadata"
|
|
"go.opentelemetry.io/collector/processor/processorhelper"
|
|
"go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper"
|
|
"go.opentelemetry.io/collector/processor/xprocessor"
|
|
)
|
|
|
|
var processorCapabilities = consumer.Capabilities{MutatesData: false}
|
|
|
|
type factory struct {
|
|
// memoryLimiters stores memoryLimiter instances with unique configs that multiple processors can reuse.
|
|
// This avoids running multiple memory checks (ie: GC) for every processor using the same processor config.
|
|
memoryLimiters map[component.Config]*memoryLimiterProcessor
|
|
lock sync.Mutex
|
|
}
|
|
|
|
// NewFactory returns a new factory for the Memory Limiter processor.
|
|
func NewFactory() xprocessor.Factory {
|
|
f := &factory{
|
|
memoryLimiters: map[component.Config]*memoryLimiterProcessor{},
|
|
}
|
|
return xprocessor.NewFactory(
|
|
metadata.Type,
|
|
createDefaultConfig,
|
|
xprocessor.WithTraces(f.createTraces, metadata.TracesStability),
|
|
xprocessor.WithMetrics(f.createMetrics, metadata.MetricsStability),
|
|
xprocessor.WithLogs(f.createLogs, metadata.LogsStability),
|
|
xprocessor.WithProfiles(f.createProfiles, metadata.ProfilesStability))
|
|
}
|
|
|
|
// CreateDefaultConfig creates the default configuration for processor. Notice
|
|
// that the default configuration is expected to fail for this processor.
|
|
func createDefaultConfig() component.Config {
|
|
return memorylimiter.NewDefaultConfig()
|
|
}
|
|
|
|
func (f *factory) createTraces(
|
|
ctx context.Context,
|
|
set processor.Settings,
|
|
cfg component.Config,
|
|
nextConsumer consumer.Traces,
|
|
) (processor.Traces, error) {
|
|
memLimiter, err := f.getMemoryLimiter(set, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return processorhelper.NewTraces(ctx, set, cfg, nextConsumer,
|
|
memLimiter.processTraces,
|
|
processorhelper.WithCapabilities(processorCapabilities),
|
|
processorhelper.WithStart(memLimiter.start),
|
|
processorhelper.WithShutdown(memLimiter.shutdown))
|
|
}
|
|
|
|
func (f *factory) createMetrics(
|
|
ctx context.Context,
|
|
set processor.Settings,
|
|
cfg component.Config,
|
|
nextConsumer consumer.Metrics,
|
|
) (processor.Metrics, error) {
|
|
memLimiter, err := f.getMemoryLimiter(set, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return processorhelper.NewMetrics(ctx, set, cfg, nextConsumer,
|
|
memLimiter.processMetrics,
|
|
processorhelper.WithCapabilities(processorCapabilities),
|
|
processorhelper.WithStart(memLimiter.start),
|
|
processorhelper.WithShutdown(memLimiter.shutdown))
|
|
}
|
|
|
|
func (f *factory) createLogs(
|
|
ctx context.Context,
|
|
set processor.Settings,
|
|
cfg component.Config,
|
|
nextConsumer consumer.Logs,
|
|
) (processor.Logs, error) {
|
|
memLimiter, err := f.getMemoryLimiter(set, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return processorhelper.NewLogs(ctx, set, cfg, nextConsumer,
|
|
memLimiter.processLogs,
|
|
processorhelper.WithCapabilities(processorCapabilities),
|
|
processorhelper.WithStart(memLimiter.start),
|
|
processorhelper.WithShutdown(memLimiter.shutdown))
|
|
}
|
|
|
|
func (f *factory) createProfiles(
|
|
ctx context.Context,
|
|
set processor.Settings,
|
|
cfg component.Config,
|
|
nextConsumer xconsumer.Profiles,
|
|
) (xprocessor.Profiles, error) {
|
|
memLimiter, err := f.getMemoryLimiter(set, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return xprocessorhelper.NewProfiles(
|
|
ctx,
|
|
set,
|
|
cfg,
|
|
nextConsumer,
|
|
memLimiter.processProfiles,
|
|
xprocessorhelper.WithCapabilities(processorCapabilities),
|
|
xprocessorhelper.WithStart(memLimiter.start),
|
|
xprocessorhelper.WithShutdown(memLimiter.shutdown),
|
|
)
|
|
}
|
|
|
|
// getMemoryLimiter checks if we have a cached memoryLimiter with a specific config,
|
|
// otherwise initialize and add one to the store.
|
|
func (f *factory) getMemoryLimiter(set processor.Settings, cfg component.Config) (*memoryLimiterProcessor, error) {
|
|
f.lock.Lock()
|
|
defer f.lock.Unlock()
|
|
|
|
if memLimiter, ok := f.memoryLimiters[cfg]; ok {
|
|
return memLimiter, nil
|
|
}
|
|
|
|
set.TelemetrySettings = telemetry.WithoutAttributes(
|
|
set.TelemetrySettings,
|
|
componentattribute.SignalKey,
|
|
componentattribute.PipelineIDKey,
|
|
componentattribute.ComponentIDKey,
|
|
)
|
|
set.Logger.Debug("created singleton logger")
|
|
|
|
memLimiter, err := newMemoryLimiterProcessor(set, cfg.(*Config))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
f.memoryLimiters[cfg] = memLimiter
|
|
return memLimiter, nil
|
|
}
|