opentelemetry-collector/internal/memorylimiter/memorylimiter.go

246 lines
7.3 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package memorylimiter // import "go.opentelemetry.io/collector/internal/memorylimiter"
import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/internal/memorylimiter/iruntime"
)
const (
mibBytes = 1024 * 1024
)
var (
// ErrDataRefused will be returned to callers of ConsumeTraceData to indicate
// that data is being refused due to high memory usage.
ErrDataRefused = errors.New("data refused due to high memory usage")
// ErrShutdownNotStarted indicates no memorylimiter has not start when shutdown
ErrShutdownNotStarted = errors.New("no existing monitoring routine is running")
// GetMemoryFn and ReadMemStatsFn make it overridable by tests
GetMemoryFn = iruntime.TotalMemory
ReadMemStatsFn = runtime.ReadMemStats
)
// MemoryLimiter is used to prevent out of memory situations on the collector.
type MemoryLimiter struct {
usageChecker memUsageChecker
memCheckWait time.Duration
// mustRefuse is used to indicate when data should be refused.
mustRefuse *atomic.Bool
ticker *time.Ticker
minGCIntervalWhenSoftLimited time.Duration
minGCIntervalWhenHardLimited time.Duration
lastGCDone time.Time
// The functions to read the mem values and run GC are set as a reference to help with
// testing different values.
readMemStatsFn func(m *runtime.MemStats)
runGCFn func()
// Fields used for logging.
logger *zap.Logger
refCounterLock sync.Mutex
refCounter int
waitGroup sync.WaitGroup
closed chan struct{}
}
// NewMemoryLimiter returns a new memory limiter component
func NewMemoryLimiter(cfg *Config, logger *zap.Logger) (*MemoryLimiter, error) {
usageChecker, err := getMemUsageChecker(cfg, logger)
if err != nil {
return nil, err
}
logger.Info("Memory limiter configured",
zap.Uint64("limit_mib", usageChecker.memAllocLimit/mibBytes),
zap.Uint64("spike_limit_mib", usageChecker.memSpikeLimit/mibBytes),
zap.Duration("check_interval", cfg.CheckInterval))
return &MemoryLimiter{
usageChecker: *usageChecker,
memCheckWait: cfg.CheckInterval,
ticker: time.NewTicker(cfg.CheckInterval),
minGCIntervalWhenSoftLimited: cfg.MinGCIntervalWhenSoftLimited,
minGCIntervalWhenHardLimited: cfg.MinGCIntervalWhenHardLimited,
lastGCDone: time.Now(),
readMemStatsFn: ReadMemStatsFn,
runGCFn: runtime.GC,
logger: logger,
mustRefuse: &atomic.Bool{},
}, nil
}
func (ml *MemoryLimiter) Start(_ context.Context, _ component.Host) error {
ml.refCounterLock.Lock()
defer ml.refCounterLock.Unlock()
ml.refCounter++
if ml.refCounter == 1 {
ml.closed = make(chan struct{})
ml.waitGroup.Add(1)
go func() {
defer ml.waitGroup.Done()
for {
select {
case <-ml.ticker.C:
case <-ml.closed:
return
}
ml.CheckMemLimits()
}
}()
}
return nil
}
// Shutdown resets MemoryLimiter monitoring ticker and stop monitoring
func (ml *MemoryLimiter) Shutdown(context.Context) error {
ml.refCounterLock.Lock()
defer ml.refCounterLock.Unlock()
switch ml.refCounter {
case 0:
return ErrShutdownNotStarted
case 1:
ml.ticker.Stop()
close(ml.closed)
ml.waitGroup.Wait()
}
ml.refCounter--
return nil
}
// MustRefuse returns if the caller should deny because memory has reached it's configured limits
func (ml *MemoryLimiter) MustRefuse() bool {
return ml.mustRefuse.Load()
}
func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, error) {
memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes
memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes
if cfg.MemoryLimitMiB != 0 {
return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit), nil
}
totalMemory, err := GetMemoryFn()
if err != nil {
return nil, fmt.Errorf("failed to get total memory, use fixed memory settings (limit_mib): %w", err)
}
logger.Info("Using percentage memory limiter",
zap.Uint64("total_memory_mib", totalMemory/mibBytes),
zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage),
zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage))
return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage),
uint64(cfg.MemorySpikePercentage)), nil
}
func (ml *MemoryLimiter) readMemStats() *runtime.MemStats {
ms := &runtime.MemStats{}
ml.readMemStatsFn(ms)
return ms
}
func memstatToZapField(ms *runtime.MemStats) zap.Field {
return zap.Uint64("cur_mem_mib", ms.Alloc/mibBytes)
}
func (ml *MemoryLimiter) doGCandReadMemStats() *runtime.MemStats {
ml.runGCFn()
ml.lastGCDone = time.Now()
ms := ml.readMemStats()
ml.logger.Info("Memory usage after GC.", memstatToZapField(ms))
return ms
}
// CheckMemLimits inspects current memory usage against threshold and toggle mustRefuse when threshold is exceeded
func (ml *MemoryLimiter) CheckMemLimits() {
ms := ml.readMemStats()
ml.logger.Debug("Currently used memory.", memstatToZapField(ms))
// Check if we are below the soft limit.
aboveSoftLimit := ml.usageChecker.aboveSoftLimit(ms)
if !aboveSoftLimit {
if ml.mustRefuse.Load() {
// Was previously refusing but enough memory is available now, no need to limit.
ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms))
}
ml.mustRefuse.Store(aboveSoftLimit)
return
}
if ml.usageChecker.aboveHardLimit(ms) {
// We are above hard limit, do a GC if it wasn't done recently and see if
// it brings memory usage below the soft limit.
if time.Since(ml.lastGCDone) > ml.minGCIntervalWhenHardLimited {
ml.logger.Warn("Memory usage is above hard limit. Forcing a GC.", memstatToZapField(ms))
ms = ml.doGCandReadMemStats()
// Check the limit again to see if GC helped.
aboveSoftLimit = ml.usageChecker.aboveSoftLimit(ms)
}
} else {
// We are above soft limit, do a GC if it wasn't done recently and see if
// it brings memory usage below the soft limit.
if time.Since(ml.lastGCDone) > ml.minGCIntervalWhenSoftLimited {
ml.logger.Info("Memory usage is above soft limit. Forcing a GC.", memstatToZapField(ms))
ms = ml.doGCandReadMemStats()
// Check the limit again to see if GC helped.
aboveSoftLimit = ml.usageChecker.aboveSoftLimit(ms)
}
}
if !ml.mustRefuse.Load() && aboveSoftLimit {
ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms))
}
ml.mustRefuse.Store(aboveSoftLimit)
}
type memUsageChecker struct {
memAllocLimit uint64
memSpikeLimit uint64
}
func (d memUsageChecker) aboveSoftLimit(ms *runtime.MemStats) bool {
return ms.Alloc >= d.memAllocLimit-d.memSpikeLimit
}
func (d memUsageChecker) aboveHardLimit(ms *runtime.MemStats) bool {
return ms.Alloc >= d.memAllocLimit
}
func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) *memUsageChecker {
if memSpikeLimit == 0 {
// If spike limit is unspecified use 20% of mem limit.
memSpikeLimit = memAllocLimit / 5
}
return &memUsageChecker{
memAllocLimit: memAllocLimit,
memSpikeLimit: memSpikeLimit,
}
}
func newPercentageMemUsageChecker(totalMemory uint64, percentageLimit, percentageSpike uint64) *memUsageChecker {
return newFixedMemUsageChecker(percentageLimit*totalMemory/100, percentageSpike*totalMemory/100)
}