[processor/memory_limiter] Update config validation (#9059)
- Fix names of the config fields that are validated in the error messages - Move the validation from start to the initialization phrase
This commit is contained in:
parent
e734426571
commit
b81d4efd05
|
@ -0,0 +1,27 @@
|
|||
# Use this changelog template to create an entry for release notes.
|
||||
|
||||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
|
||||
change_type: enhancement
|
||||
|
||||
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
|
||||
component: processor/memory_limiter
|
||||
|
||||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
|
||||
note: Update config validation errors
|
||||
|
||||
# One or more tracking issues or pull requests related to the change
|
||||
issues: [9059]
|
||||
|
||||
# (Optional) One or more lines of additional information to render under the primary note.
|
||||
# These lines will be padded with 2 spaces and then inserted directly into the document.
|
||||
# Use pipe (|) for multiline entries.
|
||||
subtext: |
|
||||
- Fix names of the config fields that are validated in the error messages
|
||||
- Move the validation from start to the initialization phrase
|
||||
|
||||
# Optional: The change log or logs in which this entry should be included.
|
||||
# e.g. '[user]' or '[user, api]'
|
||||
# Include 'user' if the change is relevant to end users.
|
||||
# Include 'api' if there is a change to a library API.
|
||||
# Default: '[user]'
|
||||
change_logs: [user]
|
|
@ -39,5 +39,20 @@ var _ component.Config = (*Config)(nil)
|
|||
|
||||
// Validate checks if the processor configuration is valid
|
||||
func (cfg *Config) Validate() error {
|
||||
if cfg.CheckInterval <= 0 {
|
||||
return errCheckIntervalOutOfRange
|
||||
}
|
||||
if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 {
|
||||
return errLimitOutOfRange
|
||||
}
|
||||
if cfg.MemoryLimitPercentage > 100 || cfg.MemorySpikePercentage > 100 {
|
||||
return errPercentageLimitOutOfRange
|
||||
}
|
||||
if cfg.MemoryLimitMiB > 0 && cfg.MemoryLimitMiB <= cfg.MemorySpikeLimitMiB {
|
||||
return errMemSpikeLimitOutOfRange
|
||||
}
|
||||
if cfg.MemoryLimitPercentage > 0 && cfg.MemoryLimitPercentage <= cfg.MemorySpikePercentage {
|
||||
return errMemSpikePercentageLimitOutOfRange
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -36,3 +36,71 @@ func TestUnmarshalConfig(t *testing.T) {
|
|||
MemorySpikeLimitMiB: 500,
|
||||
}, cfg)
|
||||
}
|
||||
|
||||
func TestConfigValidate(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
cfg *Config
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "valid",
|
||||
cfg: func() *Config {
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.MemoryLimitMiB = 5722
|
||||
cfg.MemorySpikeLimitMiB = 1907
|
||||
cfg.CheckInterval = 100 * time.Millisecond
|
||||
return cfg
|
||||
}(),
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "zero check interval",
|
||||
cfg: &Config{
|
||||
CheckInterval: 0,
|
||||
},
|
||||
err: errCheckIntervalOutOfRange,
|
||||
},
|
||||
{
|
||||
name: "unset memory limit",
|
||||
cfg: &Config{
|
||||
CheckInterval: 1 * time.Second,
|
||||
MemoryLimitMiB: 0,
|
||||
MemoryLimitPercentage: 0,
|
||||
},
|
||||
err: errLimitOutOfRange,
|
||||
},
|
||||
{
|
||||
name: "invalid memory spike limit",
|
||||
cfg: &Config{
|
||||
CheckInterval: 1 * time.Second,
|
||||
MemoryLimitMiB: 10,
|
||||
MemorySpikeLimitMiB: 10,
|
||||
},
|
||||
err: errMemSpikeLimitOutOfRange,
|
||||
},
|
||||
{
|
||||
name: "invalid memory percentage limit",
|
||||
cfg: &Config{
|
||||
CheckInterval: 1 * time.Second,
|
||||
MemoryLimitPercentage: 101,
|
||||
},
|
||||
err: errPercentageLimitOutOfRange,
|
||||
},
|
||||
{
|
||||
name: "invalid memory spike percentage limit",
|
||||
cfg: &Config{
|
||||
CheckInterval: 1 * time.Second,
|
||||
MemoryLimitPercentage: 50,
|
||||
MemorySpikePercentage: 60,
|
||||
},
|
||||
err: errMemSpikePercentageLimitOutOfRange,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tt.cfg.Validate()
|
||||
assert.Equal(t, tt.err, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,38 +31,25 @@ func TestCreateProcessor(t *testing.T) {
|
|||
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
|
||||
// This processor can't be created with the default config.
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
|
||||
assert.Nil(t, tp)
|
||||
assert.Error(t, err, "created processor with invalid settings")
|
||||
|
||||
mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
|
||||
assert.Nil(t, mp)
|
||||
assert.Error(t, err, "created processor with invalid settings")
|
||||
|
||||
lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
|
||||
assert.Nil(t, lp)
|
||||
assert.Error(t, err, "created processor with invalid settings")
|
||||
|
||||
// Create processor with a valid config.
|
||||
pCfg := cfg.(*Config)
|
||||
pCfg.MemoryLimitMiB = 5722
|
||||
pCfg.MemorySpikeLimitMiB = 1907
|
||||
pCfg.CheckInterval = 100 * time.Millisecond
|
||||
|
||||
tp, err = factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, tp)
|
||||
// test if we can shutdown a monitoring routine that has not started
|
||||
assert.ErrorIs(t, tp.Shutdown(context.Background()), errShutdownNotStarted)
|
||||
assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
mp, err = factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
|
||||
mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, mp)
|
||||
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
lp, err = factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
|
||||
lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, lp)
|
||||
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
|
|
@ -34,18 +34,16 @@ var (
|
|||
|
||||
// Construction errors
|
||||
|
||||
errCheckIntervalOutOfRange = errors.New(
|
||||
"checkInterval must be greater than zero")
|
||||
errCheckIntervalOutOfRange = errors.New("check_interval must be greater than zero")
|
||||
|
||||
errLimitOutOfRange = errors.New(
|
||||
"memAllocLimit or memoryLimitPercentage must be greater than zero")
|
||||
errLimitOutOfRange = errors.New("limit_mib or limit_percentage must be greater than zero")
|
||||
|
||||
errMemSpikeLimitOutOfRange = errors.New(
|
||||
"memSpikeLimit must be smaller than memAllocLimit")
|
||||
errMemSpikeLimitOutOfRange = errors.New("spike_limit_mib must be smaller than limit_mib")
|
||||
|
||||
errMemSpikePercentageLimitOutOfRange = errors.New("spike_limit_percentage must be smaller than limit_percentage")
|
||||
|
||||
errPercentageLimitOutOfRange = errors.New(
|
||||
"memoryLimitPercentage and memorySpikePercentage must be greater than zero and less than or equal to hundred",
|
||||
)
|
||||
"limit_percentage and spike_limit_percentage must be greater than zero and less than or equal to hundred")
|
||||
|
||||
errShutdownNotStarted = errors.New("no existing monitoring routine is running")
|
||||
)
|
||||
|
@ -86,13 +84,6 @@ const minGCIntervalWhenSoftLimited = 10 * time.Second
|
|||
|
||||
// newMemoryLimiter returns a new memorylimiter processor.
|
||||
func newMemoryLimiter(set processor.CreateSettings, cfg *Config) (*memoryLimiter, error) {
|
||||
if cfg.CheckInterval <= 0 {
|
||||
return nil, errCheckIntervalOutOfRange
|
||||
}
|
||||
if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 {
|
||||
return nil, errLimitOutOfRange
|
||||
}
|
||||
|
||||
logger := set.Logger
|
||||
usageChecker, err := getMemUsageChecker(cfg, logger)
|
||||
if err != nil {
|
||||
|
@ -129,7 +120,7 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro
|
|||
memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes
|
||||
memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes
|
||||
if cfg.MemoryLimitMiB != 0 {
|
||||
return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit)
|
||||
return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit), nil
|
||||
}
|
||||
totalMemory, err := getMemoryFn()
|
||||
if err != nil {
|
||||
|
@ -139,7 +130,8 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro
|
|||
zap.Uint64("total_memory_mib", totalMemory/mibBytes),
|
||||
zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage),
|
||||
zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage))
|
||||
return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage), uint64(cfg.MemorySpikePercentage))
|
||||
return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage),
|
||||
uint64(cfg.MemorySpikePercentage)), nil
|
||||
}
|
||||
|
||||
func (ml *memoryLimiter) start(_ context.Context, host component.Host) error {
|
||||
|
@ -319,10 +311,7 @@ func (d memUsageChecker) aboveHardLimit(ms *runtime.MemStats) bool {
|
|||
return ms.Alloc >= d.memAllocLimit
|
||||
}
|
||||
|
||||
func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) (*memUsageChecker, error) {
|
||||
if memSpikeLimit >= memAllocLimit {
|
||||
return nil, errMemSpikeLimitOutOfRange
|
||||
}
|
||||
func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) *memUsageChecker {
|
||||
if memSpikeLimit == 0 {
|
||||
// If spike limit is unspecified use 20% of mem limit.
|
||||
memSpikeLimit = memAllocLimit / 5
|
||||
|
@ -330,12 +319,9 @@ func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) (*memUsageChec
|
|||
return &memUsageChecker{
|
||||
memAllocLimit: memAllocLimit,
|
||||
memSpikeLimit: memSpikeLimit,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func newPercentageMemUsageChecker(totalMemory uint64, percentageLimit, percentageSpike uint64) (*memUsageChecker, error) {
|
||||
if percentageLimit > 100 || percentageLimit <= 0 || percentageSpike > 100 || percentageSpike <= 0 {
|
||||
return nil, errPercentageLimitOutOfRange
|
||||
}
|
||||
func newPercentageMemUsageChecker(totalMemory uint64, percentageLimit, percentageSpike uint64) *memUsageChecker {
|
||||
return newFixedMemUsageChecker(percentageLimit*totalMemory/100, percentageSpike*totalMemory/100)
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@ import (
|
|||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/component/componenttest"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/consumer/consumertest"
|
||||
"go.opentelemetry.io/collector/internal/iruntime"
|
||||
"go.opentelemetry.io/collector/pdata/plog"
|
||||
|
@ -29,71 +28,6 @@ import (
|
|||
"go.opentelemetry.io/collector/processor/processortest"
|
||||
)
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
type args struct {
|
||||
nextConsumer consumer.Traces
|
||||
checkInterval time.Duration
|
||||
memoryLimitMiB uint32
|
||||
memorySpikeLimitMiB uint32
|
||||
}
|
||||
sink := new(consumertest.TracesSink)
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantErr error
|
||||
}{
|
||||
{
|
||||
name: "zero_checkInterval",
|
||||
args: args{
|
||||
nextConsumer: sink,
|
||||
},
|
||||
wantErr: errCheckIntervalOutOfRange,
|
||||
},
|
||||
{
|
||||
name: "zero_memAllocLimit",
|
||||
args: args{
|
||||
nextConsumer: sink,
|
||||
checkInterval: 100 * time.Millisecond,
|
||||
},
|
||||
wantErr: errLimitOutOfRange,
|
||||
},
|
||||
{
|
||||
name: "memSpikeLimit_gt_memAllocLimit",
|
||||
args: args{
|
||||
nextConsumer: sink,
|
||||
checkInterval: 100 * time.Millisecond,
|
||||
memoryLimitMiB: 1,
|
||||
memorySpikeLimitMiB: 2,
|
||||
},
|
||||
wantErr: errMemSpikeLimitOutOfRange,
|
||||
},
|
||||
{
|
||||
name: "success",
|
||||
args: args{
|
||||
nextConsumer: sink,
|
||||
checkInterval: 100 * time.Millisecond,
|
||||
memoryLimitMiB: 1024,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.CheckInterval = tt.args.checkInterval
|
||||
cfg.MemoryLimitMiB = tt.args.memoryLimitMiB
|
||||
cfg.MemorySpikeLimitMiB = tt.args.memorySpikeLimitMiB
|
||||
got, err := newMemoryLimiter(processortest.NewNopCreateSettings(), cfg)
|
||||
if tt.wantErr != nil {
|
||||
assert.ErrorIs(t, err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, got.start(context.Background(), componenttest.NewNopHost()))
|
||||
assert.NoError(t, got.shutdown(context.Background()))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestMetricsMemoryPressureResponse manipulates results from querying memory and
|
||||
// check expected side effects.
|
||||
func TestMetricsMemoryPressureResponse(t *testing.T) {
|
||||
|
@ -309,11 +243,6 @@ func TestGetDecision(t *testing.T) {
|
|||
memSpikeLimit: 20 * mibBytes,
|
||||
}, d)
|
||||
})
|
||||
t.Run("fixed_limit_error", func(t *testing.T) {
|
||||
d, err := getMemUsageChecker(&Config{MemoryLimitMiB: 20, MemorySpikeLimitMiB: 100}, zap.NewNop())
|
||||
require.Error(t, err)
|
||||
assert.Nil(t, d)
|
||||
})
|
||||
|
||||
t.Cleanup(func() {
|
||||
getMemoryFn = iruntime.TotalMemory
|
||||
|
@ -329,26 +258,12 @@ func TestGetDecision(t *testing.T) {
|
|||
memSpikeLimit: 10 * mibBytes,
|
||||
}, d)
|
||||
})
|
||||
t.Run("percentage_limit_error", func(t *testing.T) {
|
||||
d, err := getMemUsageChecker(&Config{MemoryLimitPercentage: 101, MemorySpikePercentage: 10}, zap.NewNop())
|
||||
require.Error(t, err)
|
||||
assert.Nil(t, d)
|
||||
d, err = getMemUsageChecker(&Config{MemoryLimitPercentage: 99, MemorySpikePercentage: 101}, zap.NewNop())
|
||||
require.Error(t, err)
|
||||
assert.Nil(t, d)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRefuseDecision(t *testing.T) {
|
||||
decison1000Limit30Spike30, err := newPercentageMemUsageChecker(1000, 60, 30)
|
||||
require.NoError(t, err)
|
||||
decison1000Limit60Spike50, err := newPercentageMemUsageChecker(1000, 60, 50)
|
||||
require.NoError(t, err)
|
||||
decison1000Limit40Spike20, err := newPercentageMemUsageChecker(1000, 40, 20)
|
||||
require.NoError(t, err)
|
||||
decison1000Limit40Spike60, err := newPercentageMemUsageChecker(1000, 40, 60)
|
||||
require.Error(t, err)
|
||||
assert.Nil(t, decison1000Limit40Spike60)
|
||||
decison1000Limit30Spike30 := newPercentageMemUsageChecker(1000, 60, 30)
|
||||
decison1000Limit60Spike50 := newPercentageMemUsageChecker(1000, 60, 50)
|
||||
decison1000Limit40Spike20 := newPercentageMemUsageChecker(1000, 40, 20)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
Loading…
Reference in New Issue