diff --git a/defaults/defaults.go b/defaults/defaults.go index 3c17059bc0..c1a35eba91 100644 --- a/defaults/defaults.go +++ b/defaults/defaults.go @@ -32,6 +32,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/processor" "github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor" + "github.com/open-telemetry/opentelemetry-collector/processor/memorylimiter" "github.com/open-telemetry/opentelemetry-collector/processor/queuedprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/probabilisticsamplerprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/tailsamplingprocessor" @@ -87,6 +88,7 @@ func Components() ( &attributesprocessor.Factory{}, &queuedprocessor.Factory{}, &batchprocessor.Factory{}, + &memorylimiter.Factory{}, &tailsamplingprocessor.Factory{}, &probabilisticsamplerprocessor.Factory{}, ) diff --git a/defaults/defaults_test.go b/defaults/defaults_test.go index cf0ce1b7b7..3271f216d3 100644 --- a/defaults/defaults_test.go +++ b/defaults/defaults_test.go @@ -36,6 +36,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/processor" "github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor" + "github.com/open-telemetry/opentelemetry-collector/processor/memorylimiter" "github.com/open-telemetry/opentelemetry-collector/processor/queuedprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/probabilisticsamplerprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/tailsamplingprocessor" @@ -64,6 +65,7 @@ func TestDefaultComponents(t *testing.T) { "attributes": &attributesprocessor.Factory{}, "queued_retry": &queuedprocessor.Factory{}, "batch": &batchprocessor.Factory{}, + "memory_limiter": &memorylimiter.Factory{}, "tail_sampling": &tailsamplingprocessor.Factory{}, "probabilistic_sampler": &probabilisticsamplerprocessor.Factory{}, } diff --git a/processor/memorylimiter/config.go b/processor/memorylimiter/config.go new file mode 100644 index 0000000000..6c7c3840ca --- /dev/null +++ b/processor/memorylimiter/config.go @@ -0,0 +1,49 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package memorylimiter provides a processor for OpenTelemetry Service pipeline +// that drops data on the pipeline according to the current state of memory +// usage. +package memorylimiter + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" +) + +// Config defines configuration for memory memoryLimiter processor. +type Config struct { + configmodels.ProcessorSettings `mapstructure:",squash"` + + // CheckInterval is the time between measurements of memory usage for the + // purposes of avoiding going over the limits. Defaults to zero, so no + // checks will be performed. + CheckInterval time.Duration `mapstructure:"check_interval"` + + // MemoryLimitMiB is the maximum amount of memory, in MiB, targeted to be + // allocated by the process. + MemoryLimitMiB uint32 `mapstructure:"limit_mib"` + + // MemorySpikeLimitMiB is the maximum, in MiB, spike expected between the + // measurements of memory usage. + MemorySpikeLimitMiB uint32 `mapstructure:"spike_limit_mib"` + + // BallastSizeMiB is the size, in MiB, of the ballast size being used by the + // process. + BallastSizeMiB uint32 `mapstructure:"ballast_size_mib"` +} + +// Name of BallastSizeMiB config option. +const ballastSizeMibKey = "ballast_size_mib" diff --git a/processor/memorylimiter/config_test.go b/processor/memorylimiter/config_test.go new file mode 100644 index 0000000000..b5367f7669 --- /dev/null +++ b/processor/memorylimiter/config_test.go @@ -0,0 +1,65 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memorylimiter + +import ( + "path" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" +) + +func TestLoadConfig(t *testing.T) { + factories, err := config.ExampleComponents() + require.NoError(t, err) + factory := &Factory{} + factories.Processors[typeStr] = factory + require.NoError(t, err) + + config, err := config.LoadConfigFile( + t, + path.Join(".", "testdata", "config.yaml"), + factories) + + require.Nil(t, err) + require.NotNil(t, config) + + p0 := config.Processors["memory_limiter"] + assert.Equal(t, p0, + &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "memory_limiter", + NameVal: "memory_limiter", + }, + }) + + p1 := config.Processors["memory_limiter/with-settings"] + assert.Equal(t, p1, + &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "memory_limiter", + NameVal: "memory_limiter/with-settings", + }, + CheckInterval: 5 * time.Second, + MemoryLimitMiB: 4000, + MemorySpikeLimitMiB: 500, + BallastSizeMiB: 2000, + }) +} diff --git a/processor/memorylimiter/factory.go b/processor/memorylimiter/factory.go new file mode 100644 index 0000000000..392f259b45 --- /dev/null +++ b/processor/memorylimiter/factory.go @@ -0,0 +1,86 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memorylimiter + +import ( + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/consumer" + "github.com/open-telemetry/opentelemetry-collector/processor" +) + +const ( + // The value of "type" Attribute Key in configuration. + typeStr = "memory_limiter" +) + +// Factory is the factory for Attribute Key processor. +type Factory struct { +} + +// Type gets the type of the config created by this factory. +func (f *Factory) Type() string { + return typeStr +} + +// CreateDefaultConfig creates the default configuration for processor. Notice +// that the default configuration is expected to fail for this processor. +func (f *Factory) CreateDefaultConfig() configmodels.Processor { + return &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + } +} + +// CreateTraceProcessor creates a trace processor based on this config. +func (f *Factory) CreateTraceProcessor( + logger *zap.Logger, + nextConsumer consumer.TraceConsumer, + cfg configmodels.Processor, +) (processor.TraceProcessor, error) { + return f.createProcessor(logger, nextConsumer, nil, cfg) +} + +// CreateMetricsProcessor creates a metrics processor based on this config. +func (f *Factory) CreateMetricsProcessor( + logger *zap.Logger, + nextConsumer consumer.MetricsConsumer, + cfg configmodels.Processor, +) (processor.MetricsProcessor, error) { + return f.createProcessor(logger, nil, nextConsumer, cfg) +} + +func (f *Factory) createProcessor( + logger *zap.Logger, + traceConsumer consumer.TraceConsumer, + metricConsumer consumer.MetricsConsumer, + cfg configmodels.Processor, +) (processor.DualTypeProcessor, error) { + const mibBytes = 1024 * 1024 + pCfg := cfg.(*Config) + return New( + cfg.Name(), + traceConsumer, + metricConsumer, + pCfg.CheckInterval, + uint64(pCfg.MemoryLimitMiB)*mibBytes, + uint64(pCfg.MemorySpikeLimitMiB)*mibBytes, + uint64(pCfg.BallastSizeMiB)*mibBytes, + logger, + ) +} diff --git a/processor/memorylimiter/factory_test.go b/processor/memorylimiter/factory_test.go new file mode 100644 index 0000000000..895f5ba68e --- /dev/null +++ b/processor/memorylimiter/factory_test.go @@ -0,0 +1,69 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memorylimiter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/config/configcheck" + "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" +) + +func TestCreateDefaultConfig(t *testing.T) { + factory := &Factory{} + require.NotNil(t, factory) + + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, configcheck.ValidateConfig(cfg)) +} + +func TestCreateProcessor(t *testing.T) { + factory := &Factory{} + require.NotNil(t, factory) + + cfg := factory.CreateDefaultConfig() + + // This processor can't be created with the default config. + tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg) + assert.Nil(t, tp) + assert.Error(t, err, "created processor with invalid settings") + + mp, err := factory.CreateMetricsProcessor(zap.NewNop(), exportertest.NewNopMetricsExporter(), cfg) + assert.Nil(t, mp) + assert.Error(t, err, "created processor with invalid settings") + + // Create processor with a valid config. + pCfg := cfg.(*Config) + pCfg.MemoryLimitMiB = 5722 + pCfg.MemorySpikeLimitMiB = 1907 + pCfg.BallastSizeMiB = 2048 + pCfg.CheckInterval = 100 * time.Millisecond + + tp, err = factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg) + assert.NoError(t, err) + assert.NotNil(t, tp) + assert.NoError(t, tp.Shutdown()) + + mp, err = factory.CreateMetricsProcessor(zap.NewNop(), exportertest.NewNopMetricsExporter(), cfg) + assert.NoError(t, err) + assert.NotNil(t, mp) + assert.NoError(t, mp.Shutdown()) +} diff --git a/processor/memorylimiter/memorylimiter.go b/processor/memorylimiter/memorylimiter.go new file mode 100644 index 0000000000..dd01bb0791 --- /dev/null +++ b/processor/memorylimiter/memorylimiter.go @@ -0,0 +1,225 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memorylimiter + +import ( + "context" + "errors" + "runtime" + "sync/atomic" + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/component" + "github.com/open-telemetry/opentelemetry-collector/consumer" + "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + "github.com/open-telemetry/opentelemetry-collector/processor" +) + +var ( + // errForcedDrop will be returned to callers of ConsumeTraceData to indicate + // that data is being dropped due to high memory usage. + errForcedDrop = errors.New("data dropped due to high memory usage") + + // Construction errors + + errNilNextConsumer = errors.New("nil nextConsumer") + + errCheckIntervalOutOfRange = errors.New( + "checkInterval must be greater than zero") + + errMemAllocLimitOutOfRange = errors.New( + "memAllocLimit must be greater than zero") + + errMemSpikeLimitOutOfRange = errors.New( + "memSpikeLimit must be smaller than memAllocLimit") +) + +type memoryLimiter struct { + traceConsumer consumer.TraceConsumer + metricsConsumer consumer.MetricsConsumer + + memAllocLimit uint64 + memSpikeLimit uint64 + memCheckWait time.Duration + ballastSize uint64 + + // forceDrop is used atomically to indicate when data should be dropped. + forceDrop int64 + + ticker *time.Ticker + + // The function to read the mem values is set as a reference to help with + // testing different values. + readMemStatsFn func(m *runtime.MemStats) + + statsTags []tag.Mutator + + // Fields used for logging. + procName string + logger *zap.Logger + configMismatchedLogged bool +} + +var _ processor.DualTypeProcessor = (*memoryLimiter)(nil) + +// New returns a new memorylimiter processor. +func New( + name string, + traceConsumer consumer.TraceConsumer, + metricsConsumer consumer.MetricsConsumer, + checkInterval time.Duration, + memAllocLimit uint64, + memSpikeLimit uint64, + ballastSize uint64, + logger *zap.Logger, +) (processor.DualTypeProcessor, error) { + + if traceConsumer == nil && metricsConsumer == nil { + return nil, errNilNextConsumer + } + if checkInterval <= 0 { + return nil, errCheckIntervalOutOfRange + } + if memAllocLimit == 0 { + return nil, errMemAllocLimitOutOfRange + } + if memSpikeLimit >= memAllocLimit { + return nil, errMemSpikeLimitOutOfRange + } + + ml := &memoryLimiter{ + traceConsumer: traceConsumer, + metricsConsumer: metricsConsumer, + memAllocLimit: memAllocLimit, + memSpikeLimit: memSpikeLimit, + memCheckWait: checkInterval, + ballastSize: ballastSize, + ticker: time.NewTicker(checkInterval), + readMemStatsFn: runtime.ReadMemStats, + statsTags: statsTagsForBatch(name), + procName: name, + logger: logger, + } + + initMetrics() + + ml.startMonitoring() + + return ml, nil +} + +func (ml *memoryLimiter) ConsumeTraceData( + ctx context.Context, + td consumerdata.TraceData, +) error { + + if ml.forcingDrop() { + numSpans := len(td.Spans) + stats.RecordWithTags( + context.Background(), + ml.statsTags, + StatDroppedSpanCount.M(int64(numSpans))) + + return errForcedDrop + } + return ml.traceConsumer.ConsumeTraceData(ctx, td) +} + +func (ml *memoryLimiter) ConsumeMetricsData( + ctx context.Context, + md consumerdata.MetricsData, +) error { + + if ml.forcingDrop() { + numMetrics := len(md.Metrics) + stats.RecordWithTags( + context.Background(), + ml.statsTags, + StatDroppedMetricCount.M(int64(numMetrics))) + + return errForcedDrop + } + return ml.metricsConsumer.ConsumeMetricsData(ctx, md) +} + +func (ml *memoryLimiter) GetCapabilities() processor.Capabilities { + return processor.Capabilities{MutatesConsumedData: false} +} + +func (ml *memoryLimiter) Start(host component.Host) error { + return nil +} + +func (ml *memoryLimiter) Shutdown() error { + ml.ticker.Stop() + return nil +} + +func (ml *memoryLimiter) readMemStats(ms *runtime.MemStats) { + ml.readMemStatsFn(ms) + // If proper configured ms.Alloc should be at least ml.ballastSize but since + // a misconfiguration is possible check for that here. + if ms.Alloc >= ml.ballastSize { + ms.Alloc -= ml.ballastSize + } else { + // This indicates misconfiguration. Log it once. + if !ml.configMismatchedLogged { + ml.configMismatchedLogged = true + ml.logger.Warn(typeStr+" is likely incorrectly configured. "+ballastSizeMibKey+ + " must be set equal to --mem-ballast-size-mib command line option.", + zap.String("processor", ml.procName)) + } + } +} + +// startMonitoring starts a ticker'd goroutine that will check memory usage +// every checkInterval period. +func (ml *memoryLimiter) startMonitoring() { + go func() { + for range ml.ticker.C { + ml.memCheck() + } + }() +} + +// forcingDrop indicates when memory resources need to be released. +func (ml *memoryLimiter) forcingDrop() bool { + return atomic.LoadInt64(&ml.forceDrop) != 0 +} + +func (ml *memoryLimiter) memCheck() { + ms := &runtime.MemStats{} + ml.readMemStats(ms) + ml.memLimiting(ms) +} + +func (ml *memoryLimiter) shouldForceDrop(ms *runtime.MemStats) bool { + return ml.memAllocLimit <= ms.Alloc || ml.memAllocLimit-ms.Alloc <= ml.memSpikeLimit +} + +func (ml *memoryLimiter) memLimiting(ms *runtime.MemStats) { + if !ml.shouldForceDrop(ms) { + atomic.StoreInt64(&ml.forceDrop, 0) + } else { + atomic.StoreInt64(&ml.forceDrop, 1) + // Force a GC at this point and see if this is enough to get to + // the desired level. + runtime.GC() + } +} diff --git a/processor/memorylimiter/memorylimiter_test.go b/processor/memorylimiter/memorylimiter_test.go new file mode 100644 index 0000000000..8ae7217e10 --- /dev/null +++ b/processor/memorylimiter/memorylimiter_test.go @@ -0,0 +1,217 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memorylimiter + +import ( + "context" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/consumer" + "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" +) + +func TestNew(t *testing.T) { + type args struct { + nextConsumer consumer.TraceConsumer + checkInterval time.Duration + memAllocLimit uint64 + memSpikeLimit uint64 + ballastSize uint64 + } + sink := new(exportertest.SinkTraceExporter) + tests := []struct { + name string + args args + wantErr error + }{ + { + name: "nil_nextConsumer", + wantErr: errNilNextConsumer, + }, + { + name: "zero_checkInterval", + args: args{ + nextConsumer: sink, + }, + wantErr: errCheckIntervalOutOfRange, + }, + { + name: "zero_memAllocLimit", + args: args{ + nextConsumer: sink, + checkInterval: 100 * time.Millisecond, + }, + wantErr: errMemAllocLimitOutOfRange, + }, + { + name: "memSpikeLimit_gt_memAllocLimit", + args: args{ + nextConsumer: sink, + checkInterval: 100 * time.Millisecond, + memAllocLimit: 1024, + memSpikeLimit: 2048, + }, + wantErr: errMemSpikeLimitOutOfRange, + }, + { + name: "success", + args: args{ + nextConsumer: sink, + checkInterval: 100 * time.Millisecond, + memAllocLimit: 1e10, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := New( + "test", + tt.args.nextConsumer, + nil, + tt.args.checkInterval, + tt.args.memAllocLimit, + tt.args.memSpikeLimit, + tt.args.ballastSize, + zap.NewNop()) + if err != tt.wantErr { + t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != nil { + assert.NoError(t, got.Shutdown()) + } + }) + } +} + +// TestMetricsMemoryPressureResponse manipulates results from querying memory and +// check expected side effects. +func TestMetricsMemoryPressureResponse(t *testing.T) { + var currentMemAlloc uint64 + sink := new(exportertest.SinkMetricsExporter) + ml := &memoryLimiter{ + metricsConsumer: sink, + memAllocLimit: 1024, + readMemStatsFn: func(ms *runtime.MemStats) { + ms.Alloc = currentMemAlloc + }, + } + + ctx := context.Background() + td := consumerdata.MetricsData{} + + // Below memAllocLimit. + currentMemAlloc = 800 + ml.memCheck() + assert.NoError(t, ml.ConsumeMetricsData(ctx, td)) + + // Above memAllocLimit. + currentMemAlloc = 1800 + ml.memCheck() + assert.Equal(t, errForcedDrop, ml.ConsumeMetricsData(ctx, td)) + + // Check ballast effect + ml.ballastSize = 1000 + + // Below memAllocLimit accounting for ballast. + currentMemAlloc = 800 + ml.ballastSize + ml.memCheck() + assert.NoError(t, ml.ConsumeMetricsData(ctx, td)) + + // Above memAllocLimit even accountiing for ballast. + currentMemAlloc = 1800 + ml.ballastSize + ml.memCheck() + assert.Equal(t, errForcedDrop, ml.ConsumeMetricsData(ctx, td)) + + // Restore ballast to default. + ml.ballastSize = 0 + + // Check spike limit + ml.memSpikeLimit = 512 + + // Below memSpikeLimit. + currentMemAlloc = 500 + ml.memCheck() + assert.NoError(t, ml.ConsumeMetricsData(ctx, td)) + + // Above memSpikeLimit. + currentMemAlloc = 550 + ml.memCheck() + assert.Equal(t, errForcedDrop, ml.ConsumeMetricsData(ctx, td)) + +} + +// TestTraceMemoryPressureResponse manipulates results from querying memory and +// check expected side effects. +func TestTraceMemoryPressureResponse(t *testing.T) { + var currentMemAlloc uint64 + sink := new(exportertest.SinkTraceExporter) + ml := &memoryLimiter{ + traceConsumer: sink, + memAllocLimit: 1024, + readMemStatsFn: func(ms *runtime.MemStats) { + ms.Alloc = currentMemAlloc + }, + } + + ctx := context.Background() + td := consumerdata.TraceData{} + + // Below memAllocLimit. + currentMemAlloc = 800 + ml.memCheck() + assert.NoError(t, ml.ConsumeTraceData(ctx, td)) + + // Above memAllocLimit. + currentMemAlloc = 1800 + ml.memCheck() + assert.Equal(t, errForcedDrop, ml.ConsumeTraceData(ctx, td)) + + // Check ballast effect + ml.ballastSize = 1000 + + // Below memAllocLimit accounting for ballast. + currentMemAlloc = 800 + ml.ballastSize + ml.memCheck() + assert.NoError(t, ml.ConsumeTraceData(ctx, td)) + + // Above memAllocLimit even accountiing for ballast. + currentMemAlloc = 1800 + ml.ballastSize + ml.memCheck() + assert.Equal(t, errForcedDrop, ml.ConsumeTraceData(ctx, td)) + + // Restore ballast to default. + ml.ballastSize = 0 + + // Check spike limit + ml.memSpikeLimit = 512 + + // Below memSpikeLimit. + currentMemAlloc = 500 + ml.memCheck() + assert.NoError(t, ml.ConsumeTraceData(ctx, td)) + + // Above memSpikeLimit. + currentMemAlloc = 550 + ml.memCheck() + assert.Equal(t, errForcedDrop, ml.ConsumeTraceData(ctx, td)) + +} diff --git a/processor/memorylimiter/metrics.go b/processor/memorylimiter/metrics.go new file mode 100644 index 0000000000..f60b1dea8f --- /dev/null +++ b/processor/memorylimiter/metrics.go @@ -0,0 +1,94 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file contains metrics to record dropped data via memory limiter, +// the package and its int wouldn't be necessary when proper dependencies are +// exposed via packages. + +package memorylimiter + +import ( + "sync" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +// Keys and stats for telemetry. +var ( + TagExporterNameKey, _ = tag.NewKey("exporter") + + StatDroppedSpanCount = stats.Int64( + "spans_dropped", + "counts the number of spans dropped", + stats.UnitDimensionless) + + StatDroppedMetricCount = stats.Int64( + "metrics_dropped", + "counts the number of metrics dropped", + stats.UnitDimensionless) +) + +var initOnce sync.Once + +func initMetrics() { + initOnce.Do(func() { + tagKeys := []tag.Key{ + TagExporterNameKey, + } + droppedSpanBatchesView := &view.View{ + Name: "batches_dropped", + Measure: StatDroppedSpanCount, + Description: "The number of span batches dropped.", + TagKeys: tagKeys, + Aggregation: view.Count(), + } + droppedSpansView := &view.View{ + Name: StatDroppedSpanCount.Name(), + Measure: StatDroppedSpanCount, + Description: "The number of spans dropped.", + TagKeys: tagKeys, + Aggregation: view.Sum(), + } + + droppedMetricBatchesView := &view.View{ + Name: "batches_dropped", + Measure: StatDroppedMetricCount, + Description: "The number of metric batches dropped.", + TagKeys: tagKeys, + Aggregation: view.Count(), + } + droppedMetricsView := &view.View{ + Name: StatDroppedMetricCount.Name(), + Measure: StatDroppedMetricCount, + Description: "The number of metrics dropped.", + TagKeys: tagKeys, + Aggregation: view.Sum(), + } + + view.Register(droppedSpanBatchesView, droppedSpansView, droppedMetricBatchesView, droppedMetricsView) + }) +} + +// statsTagsForBatch creates a tag.Mutator that can be used to add a metric +// label with the processorName to context.Context via stats.RecordWithTags +// function. This ensures uniformity of labels for the metrics. +func statsTagsForBatch(processorName string) []tag.Mutator { + statsTags := []tag.Mutator{ + tag.Upsert(TagExporterNameKey, processorName), + } + + return statsTags +} diff --git a/processor/memorylimiter/testdata/config.yaml b/processor/memorylimiter/testdata/config.yaml new file mode 100644 index 0000000000..5f6504ba93 --- /dev/null +++ b/processor/memorylimiter/testdata/config.yaml @@ -0,0 +1,36 @@ +receivers: + examplereceiver: + +processors: + memory_limiter: + # empty config + + memory_limiter/with-settings: + # check_interval is the time between measurements of memory usage for the + # purposes of avoiding going over the limits. Defaults to zero, so no + # checks will be performed. Values below 1 second are not recommended since + # it can result in unnecessary CPU consumption. + check_interval: 5s + + # Maximum amount of memory, in MiB, targeted to be allocated by the process heap. + # Note that typically the total memory usage of process will be about 50MiB higher + # than this value. + limit_mib: 4000 + + # The maximum, in MiB, spike expected between the measurements of memory usage. + spike_limit_mib: 500 + + # BallastSizeMiB is the size, in MiB, of the ballast size being used by the process. + # This must match the value of mem-ballast-size-mib command line option (if used) + # otherwise the memory limiter will not work correctly. + ballast_size_mib: 2000 + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [memory_limiter/with-settings] + exporters: [exampleexporter] diff --git a/processor/processor.go b/processor/processor.go index 4e2115877a..88497a986e 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -41,6 +41,12 @@ type MetricsProcessor interface { Processor } +type DualTypeProcessor interface { + consumer.TraceConsumer + consumer.MetricsConsumer + Processor +} + // Capabilities describes the capabilities of TraceProcessor or MetricsProcessor. type Capabilities struct { // MutatesConsumedData is set to true if ConsumeTraceData or ConsumeMetricsData diff --git a/testbed/tests/testdata/memory-limiter.yaml b/testbed/tests/testdata/memory-limiter.yaml new file mode 100644 index 0000000000..d2d52a2541 --- /dev/null +++ b/testbed/tests/testdata/memory-limiter.yaml @@ -0,0 +1,29 @@ +receivers: + jaeger: + protocols: + thrift-http: + endpoint: "localhost:14268" + opencensus: + endpoint: "localhost:55678" + +exporters: + opencensus: + endpoint: "localhost:56565" + logging: + loglevel: debug + +processors: + queued_retry: + memory_limiter: + check_interval: 1s + limit_mib: 10 + +service: + pipelines: + traces: + receivers: [jaeger] + processors: [memory_limiter,queued_retry] + exporters: [opencensus,logging] + metrics: + receivers: [opencensus] + exporters: [opencensus,logging] diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index c6d3cc1e90..1654d880ac 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -24,6 +24,8 @@ import ( "path" "testing" + "github.com/stretchr/testify/assert" + "github.com/open-telemetry/opentelemetry-collector/testbed/testbed" ) @@ -73,22 +75,49 @@ func TestTrace10kSPS(t *testing.T) { } func TestTraceNoBackend10kSPSJaeger(t *testing.T) { - tc := testbed.NewTestCase( - t, - testbed.NewJaegerThriftDataSender(testbed.DefaultJaegerPort), - testbed.NewOCDataReceiver(testbed.DefaultOCPort), - ) - defer tc.Stop() + tests := []struct { + name string + configFileName string + expectedMaxRAM uint32 + expectedMinFinalRAM uint32 + }{ + {name: "NoMemoryLimiter", configFileName: "agent-config.yaml", expectedMaxRAM: 200, expectedMinFinalRAM: 100}, - tc.SetResourceLimits(testbed.ResourceSpec{ - ExpectedMaxCPU: 60, - ExpectedMaxRAM: 198, - }) + // Memory limiter in memory-limiter.yaml is configured to allow max 10MiB of heap size. + // However, heap is not the only memory user, so the total limit we set for this + // test is 60MiB. Note: to ensure this test verifies memorylimiter correctly + // expectedMaxRAM of this test case must be lower than expectedMinFinalRAM of the + // previous test case (which runs without memorylimiter). + {name: "MemoryLimiter", configFileName: "memory-limiter.yaml", expectedMaxRAM: 60, expectedMinFinalRAM: 10}, + } - tc.StartAgent() - tc.StartLoad(testbed.LoadOptions{DataItemsPerSecond: 10000}) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { - tc.Sleep(tc.Duration) + configFilePath := path.Join("testdata", test.configFileName) + + tc := testbed.NewTestCase( + t, + testbed.NewJaegerThriftDataSender(testbed.DefaultJaegerPort), + testbed.NewOCDataReceiver(testbed.DefaultOCPort), + testbed.WithConfigFile(configFilePath), + ) + defer tc.Stop() + + tc.SetResourceLimits(testbed.ResourceSpec{ + ExpectedMaxCPU: 60, + ExpectedMaxRAM: 198, + }) + + tc.StartAgent() + tc.StartLoad(testbed.LoadOptions{DataItemsPerSecond: 10000}) + + tc.Sleep(tc.Duration) + + rss, _, _ := tc.AgentMemoryInfo() + assert.True(t, rss > test.expectedMinFinalRAM) + }) + } } func TestTrace1kSPSWithAttrs(t *testing.T) {