Add a memory limiter processor (#498)

This adds a processor that drops data according to configured memory limits.
The processor is important for high load situations when receiving rate exceeds exporting
rate (and an extreme case of this is when the target of exporting is unavailable).

Typical production run will need to have this processor included in every pipeline
immediately after the batch processor.
This commit is contained in:
Tigran Najaryan 2020-01-14 13:20:07 -05:00 committed by GitHub
parent 9778b16b81
commit 21a70d61d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 922 additions and 13 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/processor"
"github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/memorylimiter"
"github.com/open-telemetry/opentelemetry-collector/processor/queuedprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/probabilisticsamplerprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/tailsamplingprocessor"
@ -87,6 +88,7 @@ func Components() (
&attributesprocessor.Factory{},
&queuedprocessor.Factory{},
&batchprocessor.Factory{},
&memorylimiter.Factory{},
&tailsamplingprocessor.Factory{},
&probabilisticsamplerprocessor.Factory{},
)

View File

@ -36,6 +36,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/processor"
"github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/memorylimiter"
"github.com/open-telemetry/opentelemetry-collector/processor/queuedprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/probabilisticsamplerprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/tailsamplingprocessor"
@ -64,6 +65,7 @@ func TestDefaultComponents(t *testing.T) {
"attributes": &attributesprocessor.Factory{},
"queued_retry": &queuedprocessor.Factory{},
"batch": &batchprocessor.Factory{},
"memory_limiter": &memorylimiter.Factory{},
"tail_sampling": &tailsamplingprocessor.Factory{},
"probabilistic_sampler": &probabilisticsamplerprocessor.Factory{},
}

View File

@ -0,0 +1,49 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package memorylimiter provides a processor for OpenTelemetry Service pipeline
// that drops data on the pipeline according to the current state of memory
// usage.
package memorylimiter
import (
"time"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
)
// Config defines configuration for memory memoryLimiter processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
// CheckInterval is the time between measurements of memory usage for the
// purposes of avoiding going over the limits. Defaults to zero, so no
// checks will be performed.
CheckInterval time.Duration `mapstructure:"check_interval"`
// MemoryLimitMiB is the maximum amount of memory, in MiB, targeted to be
// allocated by the process.
MemoryLimitMiB uint32 `mapstructure:"limit_mib"`
// MemorySpikeLimitMiB is the maximum, in MiB, spike expected between the
// measurements of memory usage.
MemorySpikeLimitMiB uint32 `mapstructure:"spike_limit_mib"`
// BallastSizeMiB is the size, in MiB, of the ballast size being used by the
// process.
BallastSizeMiB uint32 `mapstructure:"ballast_size_mib"`
}
// Name of BallastSizeMiB config option.
const ballastSizeMibKey = "ballast_size_mib"

View File

@ -0,0 +1,65 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memorylimiter
import (
"path"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
)
func TestLoadConfig(t *testing.T) {
factories, err := config.ExampleComponents()
require.NoError(t, err)
factory := &Factory{}
factories.Processors[typeStr] = factory
require.NoError(t, err)
config, err := config.LoadConfigFile(
t,
path.Join(".", "testdata", "config.yaml"),
factories)
require.Nil(t, err)
require.NotNil(t, config)
p0 := config.Processors["memory_limiter"]
assert.Equal(t, p0,
&Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "memory_limiter",
NameVal: "memory_limiter",
},
})
p1 := config.Processors["memory_limiter/with-settings"]
assert.Equal(t, p1,
&Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "memory_limiter",
NameVal: "memory_limiter/with-settings",
},
CheckInterval: 5 * time.Second,
MemoryLimitMiB: 4000,
MemorySpikeLimitMiB: 500,
BallastSizeMiB: 2000,
})
}

View File

@ -0,0 +1,86 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memorylimiter
import (
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/processor"
)
const (
// The value of "type" Attribute Key in configuration.
typeStr = "memory_limiter"
)
// Factory is the factory for Attribute Key processor.
type Factory struct {
}
// Type gets the type of the config created by this factory.
func (f *Factory) Type() string {
return typeStr
}
// CreateDefaultConfig creates the default configuration for processor. Notice
// that the default configuration is expected to fail for this processor.
func (f *Factory) CreateDefaultConfig() configmodels.Processor {
return &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
}
}
// CreateTraceProcessor creates a trace processor based on this config.
func (f *Factory) CreateTraceProcessor(
logger *zap.Logger,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (processor.TraceProcessor, error) {
return f.createProcessor(logger, nextConsumer, nil, cfg)
}
// CreateMetricsProcessor creates a metrics processor based on this config.
func (f *Factory) CreateMetricsProcessor(
logger *zap.Logger,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.MetricsProcessor, error) {
return f.createProcessor(logger, nil, nextConsumer, cfg)
}
func (f *Factory) createProcessor(
logger *zap.Logger,
traceConsumer consumer.TraceConsumer,
metricConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.DualTypeProcessor, error) {
const mibBytes = 1024 * 1024
pCfg := cfg.(*Config)
return New(
cfg.Name(),
traceConsumer,
metricConsumer,
pCfg.CheckInterval,
uint64(pCfg.MemoryLimitMiB)*mibBytes,
uint64(pCfg.MemorySpikeLimitMiB)*mibBytes,
uint64(pCfg.BallastSizeMiB)*mibBytes,
logger,
)
}

View File

@ -0,0 +1,69 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memorylimiter
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
)
func TestCreateDefaultConfig(t *testing.T) {
factory := &Factory{}
require.NotNil(t, factory)
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}
func TestCreateProcessor(t *testing.T) {
factory := &Factory{}
require.NotNil(t, factory)
cfg := factory.CreateDefaultConfig()
// This processor can't be created with the default config.
tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg)
assert.Nil(t, tp)
assert.Error(t, err, "created processor with invalid settings")
mp, err := factory.CreateMetricsProcessor(zap.NewNop(), exportertest.NewNopMetricsExporter(), cfg)
assert.Nil(t, mp)
assert.Error(t, err, "created processor with invalid settings")
// Create processor with a valid config.
pCfg := cfg.(*Config)
pCfg.MemoryLimitMiB = 5722
pCfg.MemorySpikeLimitMiB = 1907
pCfg.BallastSizeMiB = 2048
pCfg.CheckInterval = 100 * time.Millisecond
tp, err = factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg)
assert.NoError(t, err)
assert.NotNil(t, tp)
assert.NoError(t, tp.Shutdown())
mp, err = factory.CreateMetricsProcessor(zap.NewNop(), exportertest.NewNopMetricsExporter(), cfg)
assert.NoError(t, err)
assert.NotNil(t, mp)
assert.NoError(t, mp.Shutdown())
}

View File

@ -0,0 +1,225 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memorylimiter
import (
"context"
"errors"
"runtime"
"sync/atomic"
"time"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/processor"
)
var (
// errForcedDrop will be returned to callers of ConsumeTraceData to indicate
// that data is being dropped due to high memory usage.
errForcedDrop = errors.New("data dropped due to high memory usage")
// Construction errors
errNilNextConsumer = errors.New("nil nextConsumer")
errCheckIntervalOutOfRange = errors.New(
"checkInterval must be greater than zero")
errMemAllocLimitOutOfRange = errors.New(
"memAllocLimit must be greater than zero")
errMemSpikeLimitOutOfRange = errors.New(
"memSpikeLimit must be smaller than memAllocLimit")
)
type memoryLimiter struct {
traceConsumer consumer.TraceConsumer
metricsConsumer consumer.MetricsConsumer
memAllocLimit uint64
memSpikeLimit uint64
memCheckWait time.Duration
ballastSize uint64
// forceDrop is used atomically to indicate when data should be dropped.
forceDrop int64
ticker *time.Ticker
// The function to read the mem values is set as a reference to help with
// testing different values.
readMemStatsFn func(m *runtime.MemStats)
statsTags []tag.Mutator
// Fields used for logging.
procName string
logger *zap.Logger
configMismatchedLogged bool
}
var _ processor.DualTypeProcessor = (*memoryLimiter)(nil)
// New returns a new memorylimiter processor.
func New(
name string,
traceConsumer consumer.TraceConsumer,
metricsConsumer consumer.MetricsConsumer,
checkInterval time.Duration,
memAllocLimit uint64,
memSpikeLimit uint64,
ballastSize uint64,
logger *zap.Logger,
) (processor.DualTypeProcessor, error) {
if traceConsumer == nil && metricsConsumer == nil {
return nil, errNilNextConsumer
}
if checkInterval <= 0 {
return nil, errCheckIntervalOutOfRange
}
if memAllocLimit == 0 {
return nil, errMemAllocLimitOutOfRange
}
if memSpikeLimit >= memAllocLimit {
return nil, errMemSpikeLimitOutOfRange
}
ml := &memoryLimiter{
traceConsumer: traceConsumer,
metricsConsumer: metricsConsumer,
memAllocLimit: memAllocLimit,
memSpikeLimit: memSpikeLimit,
memCheckWait: checkInterval,
ballastSize: ballastSize,
ticker: time.NewTicker(checkInterval),
readMemStatsFn: runtime.ReadMemStats,
statsTags: statsTagsForBatch(name),
procName: name,
logger: logger,
}
initMetrics()
ml.startMonitoring()
return ml, nil
}
func (ml *memoryLimiter) ConsumeTraceData(
ctx context.Context,
td consumerdata.TraceData,
) error {
if ml.forcingDrop() {
numSpans := len(td.Spans)
stats.RecordWithTags(
context.Background(),
ml.statsTags,
StatDroppedSpanCount.M(int64(numSpans)))
return errForcedDrop
}
return ml.traceConsumer.ConsumeTraceData(ctx, td)
}
func (ml *memoryLimiter) ConsumeMetricsData(
ctx context.Context,
md consumerdata.MetricsData,
) error {
if ml.forcingDrop() {
numMetrics := len(md.Metrics)
stats.RecordWithTags(
context.Background(),
ml.statsTags,
StatDroppedMetricCount.M(int64(numMetrics)))
return errForcedDrop
}
return ml.metricsConsumer.ConsumeMetricsData(ctx, md)
}
func (ml *memoryLimiter) GetCapabilities() processor.Capabilities {
return processor.Capabilities{MutatesConsumedData: false}
}
func (ml *memoryLimiter) Start(host component.Host) error {
return nil
}
func (ml *memoryLimiter) Shutdown() error {
ml.ticker.Stop()
return nil
}
func (ml *memoryLimiter) readMemStats(ms *runtime.MemStats) {
ml.readMemStatsFn(ms)
// If proper configured ms.Alloc should be at least ml.ballastSize but since
// a misconfiguration is possible check for that here.
if ms.Alloc >= ml.ballastSize {
ms.Alloc -= ml.ballastSize
} else {
// This indicates misconfiguration. Log it once.
if !ml.configMismatchedLogged {
ml.configMismatchedLogged = true
ml.logger.Warn(typeStr+" is likely incorrectly configured. "+ballastSizeMibKey+
" must be set equal to --mem-ballast-size-mib command line option.",
zap.String("processor", ml.procName))
}
}
}
// startMonitoring starts a ticker'd goroutine that will check memory usage
// every checkInterval period.
func (ml *memoryLimiter) startMonitoring() {
go func() {
for range ml.ticker.C {
ml.memCheck()
}
}()
}
// forcingDrop indicates when memory resources need to be released.
func (ml *memoryLimiter) forcingDrop() bool {
return atomic.LoadInt64(&ml.forceDrop) != 0
}
func (ml *memoryLimiter) memCheck() {
ms := &runtime.MemStats{}
ml.readMemStats(ms)
ml.memLimiting(ms)
}
func (ml *memoryLimiter) shouldForceDrop(ms *runtime.MemStats) bool {
return ml.memAllocLimit <= ms.Alloc || ml.memAllocLimit-ms.Alloc <= ml.memSpikeLimit
}
func (ml *memoryLimiter) memLimiting(ms *runtime.MemStats) {
if !ml.shouldForceDrop(ms) {
atomic.StoreInt64(&ml.forceDrop, 0)
} else {
atomic.StoreInt64(&ml.forceDrop, 1)
// Force a GC at this point and see if this is enough to get to
// the desired level.
runtime.GC()
}
}

View File

@ -0,0 +1,217 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memorylimiter
import (
"context"
"runtime"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
)
func TestNew(t *testing.T) {
type args struct {
nextConsumer consumer.TraceConsumer
checkInterval time.Duration
memAllocLimit uint64
memSpikeLimit uint64
ballastSize uint64
}
sink := new(exportertest.SinkTraceExporter)
tests := []struct {
name string
args args
wantErr error
}{
{
name: "nil_nextConsumer",
wantErr: errNilNextConsumer,
},
{
name: "zero_checkInterval",
args: args{
nextConsumer: sink,
},
wantErr: errCheckIntervalOutOfRange,
},
{
name: "zero_memAllocLimit",
args: args{
nextConsumer: sink,
checkInterval: 100 * time.Millisecond,
},
wantErr: errMemAllocLimitOutOfRange,
},
{
name: "memSpikeLimit_gt_memAllocLimit",
args: args{
nextConsumer: sink,
checkInterval: 100 * time.Millisecond,
memAllocLimit: 1024,
memSpikeLimit: 2048,
},
wantErr: errMemSpikeLimitOutOfRange,
},
{
name: "success",
args: args{
nextConsumer: sink,
checkInterval: 100 * time.Millisecond,
memAllocLimit: 1e10,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := New(
"test",
tt.args.nextConsumer,
nil,
tt.args.checkInterval,
tt.args.memAllocLimit,
tt.args.memSpikeLimit,
tt.args.ballastSize,
zap.NewNop())
if err != tt.wantErr {
t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != nil {
assert.NoError(t, got.Shutdown())
}
})
}
}
// TestMetricsMemoryPressureResponse manipulates results from querying memory and
// check expected side effects.
func TestMetricsMemoryPressureResponse(t *testing.T) {
var currentMemAlloc uint64
sink := new(exportertest.SinkMetricsExporter)
ml := &memoryLimiter{
metricsConsumer: sink,
memAllocLimit: 1024,
readMemStatsFn: func(ms *runtime.MemStats) {
ms.Alloc = currentMemAlloc
},
}
ctx := context.Background()
td := consumerdata.MetricsData{}
// Below memAllocLimit.
currentMemAlloc = 800
ml.memCheck()
assert.NoError(t, ml.ConsumeMetricsData(ctx, td))
// Above memAllocLimit.
currentMemAlloc = 1800
ml.memCheck()
assert.Equal(t, errForcedDrop, ml.ConsumeMetricsData(ctx, td))
// Check ballast effect
ml.ballastSize = 1000
// Below memAllocLimit accounting for ballast.
currentMemAlloc = 800 + ml.ballastSize
ml.memCheck()
assert.NoError(t, ml.ConsumeMetricsData(ctx, td))
// Above memAllocLimit even accountiing for ballast.
currentMemAlloc = 1800 + ml.ballastSize
ml.memCheck()
assert.Equal(t, errForcedDrop, ml.ConsumeMetricsData(ctx, td))
// Restore ballast to default.
ml.ballastSize = 0
// Check spike limit
ml.memSpikeLimit = 512
// Below memSpikeLimit.
currentMemAlloc = 500
ml.memCheck()
assert.NoError(t, ml.ConsumeMetricsData(ctx, td))
// Above memSpikeLimit.
currentMemAlloc = 550
ml.memCheck()
assert.Equal(t, errForcedDrop, ml.ConsumeMetricsData(ctx, td))
}
// TestTraceMemoryPressureResponse manipulates results from querying memory and
// check expected side effects.
func TestTraceMemoryPressureResponse(t *testing.T) {
var currentMemAlloc uint64
sink := new(exportertest.SinkTraceExporter)
ml := &memoryLimiter{
traceConsumer: sink,
memAllocLimit: 1024,
readMemStatsFn: func(ms *runtime.MemStats) {
ms.Alloc = currentMemAlloc
},
}
ctx := context.Background()
td := consumerdata.TraceData{}
// Below memAllocLimit.
currentMemAlloc = 800
ml.memCheck()
assert.NoError(t, ml.ConsumeTraceData(ctx, td))
// Above memAllocLimit.
currentMemAlloc = 1800
ml.memCheck()
assert.Equal(t, errForcedDrop, ml.ConsumeTraceData(ctx, td))
// Check ballast effect
ml.ballastSize = 1000
// Below memAllocLimit accounting for ballast.
currentMemAlloc = 800 + ml.ballastSize
ml.memCheck()
assert.NoError(t, ml.ConsumeTraceData(ctx, td))
// Above memAllocLimit even accountiing for ballast.
currentMemAlloc = 1800 + ml.ballastSize
ml.memCheck()
assert.Equal(t, errForcedDrop, ml.ConsumeTraceData(ctx, td))
// Restore ballast to default.
ml.ballastSize = 0
// Check spike limit
ml.memSpikeLimit = 512
// Below memSpikeLimit.
currentMemAlloc = 500
ml.memCheck()
assert.NoError(t, ml.ConsumeTraceData(ctx, td))
// Above memSpikeLimit.
currentMemAlloc = 550
ml.memCheck()
assert.Equal(t, errForcedDrop, ml.ConsumeTraceData(ctx, td))
}

View File

@ -0,0 +1,94 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file contains metrics to record dropped data via memory limiter,
// the package and its int wouldn't be necessary when proper dependencies are
// exposed via packages.
package memorylimiter
import (
"sync"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
// Keys and stats for telemetry.
var (
TagExporterNameKey, _ = tag.NewKey("exporter")
StatDroppedSpanCount = stats.Int64(
"spans_dropped",
"counts the number of spans dropped",
stats.UnitDimensionless)
StatDroppedMetricCount = stats.Int64(
"metrics_dropped",
"counts the number of metrics dropped",
stats.UnitDimensionless)
)
var initOnce sync.Once
func initMetrics() {
initOnce.Do(func() {
tagKeys := []tag.Key{
TagExporterNameKey,
}
droppedSpanBatchesView := &view.View{
Name: "batches_dropped",
Measure: StatDroppedSpanCount,
Description: "The number of span batches dropped.",
TagKeys: tagKeys,
Aggregation: view.Count(),
}
droppedSpansView := &view.View{
Name: StatDroppedSpanCount.Name(),
Measure: StatDroppedSpanCount,
Description: "The number of spans dropped.",
TagKeys: tagKeys,
Aggregation: view.Sum(),
}
droppedMetricBatchesView := &view.View{
Name: "batches_dropped",
Measure: StatDroppedMetricCount,
Description: "The number of metric batches dropped.",
TagKeys: tagKeys,
Aggregation: view.Count(),
}
droppedMetricsView := &view.View{
Name: StatDroppedMetricCount.Name(),
Measure: StatDroppedMetricCount,
Description: "The number of metrics dropped.",
TagKeys: tagKeys,
Aggregation: view.Sum(),
}
view.Register(droppedSpanBatchesView, droppedSpansView, droppedMetricBatchesView, droppedMetricsView)
})
}
// statsTagsForBatch creates a tag.Mutator that can be used to add a metric
// label with the processorName to context.Context via stats.RecordWithTags
// function. This ensures uniformity of labels for the metrics.
func statsTagsForBatch(processorName string) []tag.Mutator {
statsTags := []tag.Mutator{
tag.Upsert(TagExporterNameKey, processorName),
}
return statsTags
}

View File

@ -0,0 +1,36 @@
receivers:
examplereceiver:
processors:
memory_limiter:
# empty config
memory_limiter/with-settings:
# check_interval is the time between measurements of memory usage for the
# purposes of avoiding going over the limits. Defaults to zero, so no
# checks will be performed. Values below 1 second are not recommended since
# it can result in unnecessary CPU consumption.
check_interval: 5s
# Maximum amount of memory, in MiB, targeted to be allocated by the process heap.
# Note that typically the total memory usage of process will be about 50MiB higher
# than this value.
limit_mib: 4000
# The maximum, in MiB, spike expected between the measurements of memory usage.
spike_limit_mib: 500
# BallastSizeMiB is the size, in MiB, of the ballast size being used by the process.
# This must match the value of mem-ballast-size-mib command line option (if used)
# otherwise the memory limiter will not work correctly.
ballast_size_mib: 2000
exporters:
exampleexporter:
service:
pipelines:
traces:
receivers: [examplereceiver]
processors: [memory_limiter/with-settings]
exporters: [exampleexporter]

View File

@ -41,6 +41,12 @@ type MetricsProcessor interface {
Processor
}
type DualTypeProcessor interface {
consumer.TraceConsumer
consumer.MetricsConsumer
Processor
}
// Capabilities describes the capabilities of TraceProcessor or MetricsProcessor.
type Capabilities struct {
// MutatesConsumedData is set to true if ConsumeTraceData or ConsumeMetricsData

View File

@ -0,0 +1,29 @@
receivers:
jaeger:
protocols:
thrift-http:
endpoint: "localhost:14268"
opencensus:
endpoint: "localhost:55678"
exporters:
opencensus:
endpoint: "localhost:56565"
logging:
loglevel: debug
processors:
queued_retry:
memory_limiter:
check_interval: 1s
limit_mib: 10
service:
pipelines:
traces:
receivers: [jaeger]
processors: [memory_limiter,queued_retry]
exporters: [opencensus,logging]
metrics:
receivers: [opencensus]
exporters: [opencensus,logging]

View File

@ -24,6 +24,8 @@ import (
"path"
"testing"
"github.com/stretchr/testify/assert"
"github.com/open-telemetry/opentelemetry-collector/testbed/testbed"
)
@ -73,22 +75,49 @@ func TestTrace10kSPS(t *testing.T) {
}
func TestTraceNoBackend10kSPSJaeger(t *testing.T) {
tc := testbed.NewTestCase(
t,
testbed.NewJaegerThriftDataSender(testbed.DefaultJaegerPort),
testbed.NewOCDataReceiver(testbed.DefaultOCPort),
)
defer tc.Stop()
tests := []struct {
name string
configFileName string
expectedMaxRAM uint32
expectedMinFinalRAM uint32
}{
{name: "NoMemoryLimiter", configFileName: "agent-config.yaml", expectedMaxRAM: 200, expectedMinFinalRAM: 100},
tc.SetResourceLimits(testbed.ResourceSpec{
ExpectedMaxCPU: 60,
ExpectedMaxRAM: 198,
})
// Memory limiter in memory-limiter.yaml is configured to allow max 10MiB of heap size.
// However, heap is not the only memory user, so the total limit we set for this
// test is 60MiB. Note: to ensure this test verifies memorylimiter correctly
// expectedMaxRAM of this test case must be lower than expectedMinFinalRAM of the
// previous test case (which runs without memorylimiter).
{name: "MemoryLimiter", configFileName: "memory-limiter.yaml", expectedMaxRAM: 60, expectedMinFinalRAM: 10},
}
tc.StartAgent()
tc.StartLoad(testbed.LoadOptions{DataItemsPerSecond: 10000})
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tc.Sleep(tc.Duration)
configFilePath := path.Join("testdata", test.configFileName)
tc := testbed.NewTestCase(
t,
testbed.NewJaegerThriftDataSender(testbed.DefaultJaegerPort),
testbed.NewOCDataReceiver(testbed.DefaultOCPort),
testbed.WithConfigFile(configFilePath),
)
defer tc.Stop()
tc.SetResourceLimits(testbed.ResourceSpec{
ExpectedMaxCPU: 60,
ExpectedMaxRAM: 198,
})
tc.StartAgent()
tc.StartLoad(testbed.LoadOptions{DataItemsPerSecond: 10000})
tc.Sleep(tc.Duration)
rss, _, _ := tc.AgentMemoryInfo()
assert.True(t, rss > test.expectedMinFinalRAM)
})
}
}
func TestTrace1kSPSWithAttrs(t *testing.T) {