246 lines
7.3 KiB
Go
246 lines
7.3 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package memorylimiter // import "go.opentelemetry.io/collector/internal/memorylimiter"
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"go.opentelemetry.io/collector/component"
|
|
"go.opentelemetry.io/collector/internal/memorylimiter/iruntime"
|
|
)
|
|
|
|
const (
|
|
mibBytes = 1024 * 1024
|
|
)
|
|
|
|
var (
|
|
// ErrDataRefused will be returned to callers of ConsumeTraceData to indicate
|
|
// that data is being refused due to high memory usage.
|
|
ErrDataRefused = errors.New("data refused due to high memory usage")
|
|
|
|
// ErrShutdownNotStarted indicates no memorylimiter has not start when shutdown
|
|
ErrShutdownNotStarted = errors.New("no existing monitoring routine is running")
|
|
|
|
// GetMemoryFn and ReadMemStatsFn make it overridable by tests
|
|
GetMemoryFn = iruntime.TotalMemory
|
|
ReadMemStatsFn = runtime.ReadMemStats
|
|
)
|
|
|
|
// MemoryLimiter is used to prevent out of memory situations on the collector.
|
|
type MemoryLimiter struct {
|
|
usageChecker memUsageChecker
|
|
|
|
memCheckWait time.Duration
|
|
|
|
// mustRefuse is used to indicate when data should be refused.
|
|
mustRefuse *atomic.Bool
|
|
|
|
ticker *time.Ticker
|
|
|
|
minGCIntervalWhenSoftLimited time.Duration
|
|
minGCIntervalWhenHardLimited time.Duration
|
|
lastGCDone time.Time
|
|
|
|
// The functions to read the mem values and run GC are set as a reference to help with
|
|
// testing different values.
|
|
readMemStatsFn func(m *runtime.MemStats)
|
|
runGCFn func()
|
|
|
|
// Fields used for logging.
|
|
logger *zap.Logger
|
|
|
|
refCounterLock sync.Mutex
|
|
refCounter int
|
|
waitGroup sync.WaitGroup
|
|
closed chan struct{}
|
|
}
|
|
|
|
// NewMemoryLimiter returns a new memory limiter component
|
|
func NewMemoryLimiter(cfg *Config, logger *zap.Logger) (*MemoryLimiter, error) {
|
|
usageChecker, err := getMemUsageChecker(cfg, logger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
logger.Info("Memory limiter configured",
|
|
zap.Uint64("limit_mib", usageChecker.memAllocLimit/mibBytes),
|
|
zap.Uint64("spike_limit_mib", usageChecker.memSpikeLimit/mibBytes),
|
|
zap.Duration("check_interval", cfg.CheckInterval))
|
|
|
|
return &MemoryLimiter{
|
|
usageChecker: *usageChecker,
|
|
memCheckWait: cfg.CheckInterval,
|
|
ticker: time.NewTicker(cfg.CheckInterval),
|
|
minGCIntervalWhenSoftLimited: cfg.MinGCIntervalWhenSoftLimited,
|
|
minGCIntervalWhenHardLimited: cfg.MinGCIntervalWhenHardLimited,
|
|
lastGCDone: time.Now(),
|
|
readMemStatsFn: ReadMemStatsFn,
|
|
runGCFn: runtime.GC,
|
|
logger: logger,
|
|
mustRefuse: &atomic.Bool{},
|
|
}, nil
|
|
}
|
|
|
|
func (ml *MemoryLimiter) Start(_ context.Context, _ component.Host) error {
|
|
ml.refCounterLock.Lock()
|
|
defer ml.refCounterLock.Unlock()
|
|
|
|
ml.refCounter++
|
|
if ml.refCounter == 1 {
|
|
ml.closed = make(chan struct{})
|
|
ml.waitGroup.Add(1)
|
|
go func() {
|
|
defer ml.waitGroup.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-ml.ticker.C:
|
|
case <-ml.closed:
|
|
return
|
|
}
|
|
ml.CheckMemLimits()
|
|
}
|
|
}()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Shutdown resets MemoryLimiter monitoring ticker and stop monitoring
|
|
func (ml *MemoryLimiter) Shutdown(context.Context) error {
|
|
ml.refCounterLock.Lock()
|
|
defer ml.refCounterLock.Unlock()
|
|
|
|
switch ml.refCounter {
|
|
case 0:
|
|
return ErrShutdownNotStarted
|
|
case 1:
|
|
ml.ticker.Stop()
|
|
close(ml.closed)
|
|
ml.waitGroup.Wait()
|
|
}
|
|
ml.refCounter--
|
|
return nil
|
|
}
|
|
|
|
// MustRefuse returns if the caller should deny because memory has reached it's configured limits
|
|
func (ml *MemoryLimiter) MustRefuse() bool {
|
|
return ml.mustRefuse.Load()
|
|
}
|
|
|
|
func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, error) {
|
|
memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes
|
|
memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes
|
|
if cfg.MemoryLimitMiB != 0 {
|
|
return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit), nil
|
|
}
|
|
totalMemory, err := GetMemoryFn()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get total memory, use fixed memory settings (limit_mib): %w", err)
|
|
}
|
|
logger.Info("Using percentage memory limiter",
|
|
zap.Uint64("total_memory_mib", totalMemory/mibBytes),
|
|
zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage),
|
|
zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage))
|
|
return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage),
|
|
uint64(cfg.MemorySpikePercentage)), nil
|
|
}
|
|
|
|
func (ml *MemoryLimiter) readMemStats() *runtime.MemStats {
|
|
ms := &runtime.MemStats{}
|
|
ml.readMemStatsFn(ms)
|
|
return ms
|
|
}
|
|
|
|
func memstatToZapField(ms *runtime.MemStats) zap.Field {
|
|
return zap.Uint64("cur_mem_mib", ms.Alloc/mibBytes)
|
|
}
|
|
|
|
func (ml *MemoryLimiter) doGCandReadMemStats() *runtime.MemStats {
|
|
ml.runGCFn()
|
|
ml.lastGCDone = time.Now()
|
|
ms := ml.readMemStats()
|
|
ml.logger.Info("Memory usage after GC.", memstatToZapField(ms))
|
|
return ms
|
|
}
|
|
|
|
// CheckMemLimits inspects current memory usage against threshold and toggle mustRefuse when threshold is exceeded
|
|
func (ml *MemoryLimiter) CheckMemLimits() {
|
|
ms := ml.readMemStats()
|
|
|
|
ml.logger.Debug("Currently used memory.", memstatToZapField(ms))
|
|
|
|
// Check if we are below the soft limit.
|
|
aboveSoftLimit := ml.usageChecker.aboveSoftLimit(ms)
|
|
if !aboveSoftLimit {
|
|
if ml.mustRefuse.Load() {
|
|
// Was previously refusing but enough memory is available now, no need to limit.
|
|
ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms))
|
|
}
|
|
ml.mustRefuse.Store(aboveSoftLimit)
|
|
return
|
|
}
|
|
|
|
if ml.usageChecker.aboveHardLimit(ms) {
|
|
// We are above hard limit, do a GC if it wasn't done recently and see if
|
|
// it brings memory usage below the soft limit.
|
|
if time.Since(ml.lastGCDone) > ml.minGCIntervalWhenHardLimited {
|
|
ml.logger.Warn("Memory usage is above hard limit. Forcing a GC.", memstatToZapField(ms))
|
|
ms = ml.doGCandReadMemStats()
|
|
// Check the limit again to see if GC helped.
|
|
aboveSoftLimit = ml.usageChecker.aboveSoftLimit(ms)
|
|
}
|
|
} else {
|
|
// We are above soft limit, do a GC if it wasn't done recently and see if
|
|
// it brings memory usage below the soft limit.
|
|
if time.Since(ml.lastGCDone) > ml.minGCIntervalWhenSoftLimited {
|
|
ml.logger.Info("Memory usage is above soft limit. Forcing a GC.", memstatToZapField(ms))
|
|
ms = ml.doGCandReadMemStats()
|
|
// Check the limit again to see if GC helped.
|
|
aboveSoftLimit = ml.usageChecker.aboveSoftLimit(ms)
|
|
}
|
|
}
|
|
|
|
if !ml.mustRefuse.Load() && aboveSoftLimit {
|
|
ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms))
|
|
}
|
|
|
|
ml.mustRefuse.Store(aboveSoftLimit)
|
|
}
|
|
|
|
type memUsageChecker struct {
|
|
memAllocLimit uint64
|
|
memSpikeLimit uint64
|
|
}
|
|
|
|
func (d memUsageChecker) aboveSoftLimit(ms *runtime.MemStats) bool {
|
|
return ms.Alloc >= d.memAllocLimit-d.memSpikeLimit
|
|
}
|
|
|
|
func (d memUsageChecker) aboveHardLimit(ms *runtime.MemStats) bool {
|
|
return ms.Alloc >= d.memAllocLimit
|
|
}
|
|
|
|
func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) *memUsageChecker {
|
|
if memSpikeLimit == 0 {
|
|
// If spike limit is unspecified use 20% of mem limit.
|
|
memSpikeLimit = memAllocLimit / 5
|
|
}
|
|
return &memUsageChecker{
|
|
memAllocLimit: memAllocLimit,
|
|
memSpikeLimit: memSpikeLimit,
|
|
}
|
|
}
|
|
|
|
func newPercentageMemUsageChecker(totalMemory uint64, percentageLimit, percentageSpike uint64) *memUsageChecker {
|
|
return newFixedMemUsageChecker(percentageLimit*totalMemory/100, percentageSpike*totalMemory/100)
|
|
}
|