111 lines
3.5 KiB
Go
111 lines
3.5 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package memorylimiterprocessor // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor"
|
|
|
|
import (
|
|
"context"
|
|
|
|
"go.opentelemetry.io/collector/component"
|
|
"go.opentelemetry.io/collector/internal/memorylimiter"
|
|
"go.opentelemetry.io/collector/pdata/plog"
|
|
"go.opentelemetry.io/collector/pdata/pmetric"
|
|
"go.opentelemetry.io/collector/pdata/pprofile"
|
|
"go.opentelemetry.io/collector/pdata/ptrace"
|
|
"go.opentelemetry.io/collector/pipeline"
|
|
"go.opentelemetry.io/collector/pipeline/xpipeline"
|
|
"go.opentelemetry.io/collector/processor"
|
|
)
|
|
|
|
type memoryLimiterProcessor struct {
|
|
memlimiter *memorylimiter.MemoryLimiter
|
|
obsrep *obsReport
|
|
}
|
|
|
|
// newMemoryLimiter returns a new memorylimiter processor.
|
|
func newMemoryLimiterProcessor(set processor.Settings, cfg *Config) (*memoryLimiterProcessor, error) {
|
|
ml, err := memorylimiter.NewMemoryLimiter(cfg, set.Logger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
obsrep, err := newObsReport(set)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
p := &memoryLimiterProcessor{
|
|
memlimiter: ml,
|
|
obsrep: obsrep,
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func (p *memoryLimiterProcessor) start(ctx context.Context, host component.Host) error {
|
|
return p.memlimiter.Start(ctx, host)
|
|
}
|
|
|
|
func (p *memoryLimiterProcessor) shutdown(ctx context.Context) error {
|
|
return p.memlimiter.Shutdown(ctx)
|
|
}
|
|
|
|
func (p *memoryLimiterProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
|
|
numSpans := td.SpanCount()
|
|
if p.memlimiter.MustRefuse() {
|
|
// TODO:
|
|
// https://github.com/open-telemetry/opentelemetry-collector/issues/12463
|
|
p.obsrep.refused(ctx, numSpans, pipeline.SignalTraces)
|
|
return td, memorylimiter.ErrDataRefused
|
|
}
|
|
|
|
// Even if the next consumer returns error record the data as accepted by
|
|
// this processor.
|
|
p.obsrep.accepted(ctx, numSpans, pipeline.SignalTraces)
|
|
return td, nil
|
|
}
|
|
|
|
func (p *memoryLimiterProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
|
|
numDataPoints := md.DataPointCount()
|
|
if p.memlimiter.MustRefuse() {
|
|
// TODO:
|
|
// https://github.com/open-telemetry/opentelemetry-collector/issues/12463
|
|
p.obsrep.refused(ctx, numDataPoints, pipeline.SignalMetrics)
|
|
return md, memorylimiter.ErrDataRefused
|
|
}
|
|
|
|
// Even if the next consumer returns error record the data as accepted by
|
|
// this processor.
|
|
p.obsrep.accepted(ctx, numDataPoints, pipeline.SignalMetrics)
|
|
return md, nil
|
|
}
|
|
|
|
func (p *memoryLimiterProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
|
|
numRecords := ld.LogRecordCount()
|
|
if p.memlimiter.MustRefuse() {
|
|
// TODO:
|
|
// https://github.com/open-telemetry/opentelemetry-collector/issues/12463
|
|
p.obsrep.refused(ctx, numRecords, pipeline.SignalLogs)
|
|
return ld, memorylimiter.ErrDataRefused
|
|
}
|
|
|
|
// Even if the next consumer returns error record the data as accepted by
|
|
// this processor.
|
|
p.obsrep.accepted(ctx, numRecords, pipeline.SignalLogs)
|
|
return ld, nil
|
|
}
|
|
|
|
func (p *memoryLimiterProcessor) processProfiles(ctx context.Context, td pprofile.Profiles) (pprofile.Profiles, error) {
|
|
numProfiles := td.SampleCount()
|
|
if p.memlimiter.MustRefuse() {
|
|
// TODO:
|
|
// https://github.com/open-telemetry/opentelemetry-collector/issues/12463
|
|
p.obsrep.refused(ctx, numProfiles, xpipeline.SignalProfiles)
|
|
return td, memorylimiter.ErrDataRefused
|
|
}
|
|
|
|
// Even if the next consumer returns error record the data as accepted by
|
|
// this processor.
|
|
p.obsrep.accepted(ctx, numProfiles, xpipeline.SignalProfiles)
|
|
return td, nil
|
|
}
|