opentelemetry-collector/processor/memorylimiterprocessor/memorylimiter.go

111 lines
3.5 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package memorylimiterprocessor // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor"
import (
"context"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/internal/memorylimiter"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/pipeline/xpipeline"
"go.opentelemetry.io/collector/processor"
)
type memoryLimiterProcessor struct {
memlimiter *memorylimiter.MemoryLimiter
obsrep *obsReport
}
// newMemoryLimiter returns a new memorylimiter processor.
func newMemoryLimiterProcessor(set processor.Settings, cfg *Config) (*memoryLimiterProcessor, error) {
ml, err := memorylimiter.NewMemoryLimiter(cfg, set.Logger)
if err != nil {
return nil, err
}
obsrep, err := newObsReport(set)
if err != nil {
return nil, err
}
p := &memoryLimiterProcessor{
memlimiter: ml,
obsrep: obsrep,
}
return p, nil
}
func (p *memoryLimiterProcessor) start(ctx context.Context, host component.Host) error {
return p.memlimiter.Start(ctx, host)
}
func (p *memoryLimiterProcessor) shutdown(ctx context.Context) error {
return p.memlimiter.Shutdown(ctx)
}
func (p *memoryLimiterProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
numSpans := td.SpanCount()
if p.memlimiter.MustRefuse() {
// TODO:
// https://github.com/open-telemetry/opentelemetry-collector/issues/12463
p.obsrep.refused(ctx, numSpans, pipeline.SignalTraces)
return td, memorylimiter.ErrDataRefused
}
// Even if the next consumer returns error record the data as accepted by
// this processor.
p.obsrep.accepted(ctx, numSpans, pipeline.SignalTraces)
return td, nil
}
func (p *memoryLimiterProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
numDataPoints := md.DataPointCount()
if p.memlimiter.MustRefuse() {
// TODO:
// https://github.com/open-telemetry/opentelemetry-collector/issues/12463
p.obsrep.refused(ctx, numDataPoints, pipeline.SignalMetrics)
return md, memorylimiter.ErrDataRefused
}
// Even if the next consumer returns error record the data as accepted by
// this processor.
p.obsrep.accepted(ctx, numDataPoints, pipeline.SignalMetrics)
return md, nil
}
func (p *memoryLimiterProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
numRecords := ld.LogRecordCount()
if p.memlimiter.MustRefuse() {
// TODO:
// https://github.com/open-telemetry/opentelemetry-collector/issues/12463
p.obsrep.refused(ctx, numRecords, pipeline.SignalLogs)
return ld, memorylimiter.ErrDataRefused
}
// Even if the next consumer returns error record the data as accepted by
// this processor.
p.obsrep.accepted(ctx, numRecords, pipeline.SignalLogs)
return ld, nil
}
func (p *memoryLimiterProcessor) processProfiles(ctx context.Context, td pprofile.Profiles) (pprofile.Profiles, error) {
numProfiles := td.SampleCount()
if p.memlimiter.MustRefuse() {
// TODO:
// https://github.com/open-telemetry/opentelemetry-collector/issues/12463
p.obsrep.refused(ctx, numProfiles, xpipeline.SignalProfiles)
return td, memorylimiter.ErrDataRefused
}
// Even if the next consumer returns error record the data as accepted by
// this processor.
p.obsrep.accepted(ctx, numProfiles, xpipeline.SignalProfiles)
return td, nil
}