[service] Remove getBallastSize from service (#10696)

#### Description

This PR removes all ballast logic from service. This effectively
deprecates the ballastextension as including the extension with this
service would do nothing.

Related to
https://github.com/open-telemetry/opentelemetry-collector/pull/10671

#### Link to tracking issue
Closes
https://github.com/open-telemetry/opentelemetry-collector/issues/8342

#### Testing
Unit tests.
This commit is contained in:
Tyler Helmuth 2024-07-23 15:59:53 -06:00 committed by GitHub
parent c239e73bbb
commit 4e44e32280
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 87 additions and 195 deletions

View File

@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processor/memorylimiter
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The memory limiter processor will no longer account for ballast size.
# One or more tracking issues or pull requests related to the change
issues: [10696]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []

View File

@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: extension/memorylimiter
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The memory limiter extension will no longer account for ballast size.
# One or more tracking issues or pull requests related to the change
issues: [10696]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []

View File

@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The service will no longer be able to get a ballast size from the deprecated ballast extension.
# One or more tracking issues or pull requests related to the change
issues: [10696]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []

View File

@ -44,7 +44,6 @@ type MemoryLimiter struct {
usageChecker memUsageChecker
memCheckWait time.Duration
ballastSize uint64
// mustRefuse is used to indicate when data should be refused.
mustRefuse *atomic.Bool
@ -58,8 +57,7 @@ type MemoryLimiter struct {
readMemStatsFn func(m *runtime.MemStats)
// Fields used for logging.
logger *zap.Logger
configMismatchedLogged bool
logger *zap.Logger
refCounterLock sync.Mutex
refCounter int
@ -114,14 +112,7 @@ func (ml *MemoryLimiter) startMonitoring() {
}
}
func (ml *MemoryLimiter) Start(_ context.Context, host component.Host) error {
extensions := host.GetExtensions()
for _, extension := range extensions {
if ext, ok := extension.(interface{ GetBallastSize() uint64 }); ok {
ml.ballastSize = ext.GetBallastSize()
break
}
}
func (ml *MemoryLimiter) Start(_ context.Context, _ component.Host) error {
ml.startMonitoring()
return nil
}
@ -168,16 +159,6 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro
func (ml *MemoryLimiter) readMemStats() *runtime.MemStats {
ms := &runtime.MemStats{}
ml.readMemStatsFn(ms)
// If proper configured ms.Alloc should be at least ml.ballastSize but since
// a misconfiguration is possible check for that here.
if ms.Alloc >= ml.ballastSize {
ms.Alloc -= ml.ballastSize
} else if !ml.configMismatchedLogged {
// This indicates misconfiguration. Log it once.
ml.configMismatchedLogged = true
ml.logger.Warn(`"size_mib" in ballast extension is likely incorrectly configured.`)
}
return ms
}

View File

@ -4,17 +4,14 @@
package memorylimiter
import (
"context"
"runtime"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/internal/iruntime"
)
@ -43,22 +40,6 @@ func TestMemoryPressureResponse(t *testing.T) {
ml.CheckMemLimits()
assert.True(t, ml.MustRefuse())
// Check ballast effect
ml.ballastSize = 1000
// Below memAllocLimit accounting for ballast.
currentMemAlloc = 800 + ml.ballastSize
ml.CheckMemLimits()
assert.False(t, ml.MustRefuse())
// Above memAllocLimit even accounting for ballast.
currentMemAlloc = 1800 + ml.ballastSize
ml.CheckMemLimits()
assert.True(t, ml.MustRefuse())
// Restore ballast to default.
ml.ballastSize = 0
// Check spike limit
ml.usageChecker.memSpikeLimit = 512
@ -151,38 +132,3 @@ func TestRefuseDecision(t *testing.T) {
})
}
}
func TestBallastSize(t *testing.T) {
cfg := &Config{
CheckInterval: 10 * time.Second,
MemoryLimitMiB: 1024,
}
got, err := NewMemoryLimiter(cfg, zap.NewNop())
require.NoError(t, err)
got.startMonitoring()
require.NoError(t, got.Start(context.Background(), &host{ballastSize: 113}))
assert.Equal(t, uint64(113), got.ballastSize)
require.NoError(t, got.Shutdown(context.Background()))
}
type host struct {
ballastSize uint64
component.Host
}
func (h *host) GetExtensions() map[component.ID]component.Component {
ret := make(map[component.ID]component.Component)
ret[component.MustNewID("ballast")] = &ballastExtension{ballastSize: h.ballastSize}
return ret
}
type ballastExtension struct {
ballastSize uint64
component.StartFunc
component.ShutdownFunc
}
func (be *ballastExtension) GetBallastSize() uint64 {
return be.ballastSize
}

View File

@ -122,7 +122,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
tests := []struct {
name string
mlCfg *Config
ballastSize uint64
memAlloc uint64
expectError bool
}{
@ -133,7 +132,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
ballastSize: 0,
memAlloc: 800,
expectError: false,
},
@ -144,29 +142,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
ballastSize: 0,
memAlloc: 1800,
expectError: true,
},
{
name: "Below memAllocLimit accounting for ballast",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
ballastSize: 1000,
memAlloc: 800,
expectError: false,
},
{
name: "Above memAllocLimit even accounting for ballast",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
ballastSize: 1000,
memAlloc: 1800,
expectError: true,
},
@ -177,7 +152,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
MemoryLimitPercentage: 50,
MemorySpikePercentage: 10,
},
ballastSize: 0,
memAlloc: 800,
expectError: false,
},
@ -188,7 +162,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
MemoryLimitPercentage: 50,
MemorySpikePercentage: 11,
},
ballastSize: 0,
memAlloc: 800,
expectError: true,
},
@ -197,7 +170,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
memorylimiter.GetMemoryFn = totalMemory
memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) {
ms.Alloc = tt.memAlloc + tt.ballastSize
ms.Alloc = tt.memAlloc
}
ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg)
@ -213,7 +186,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
processorhelper.WithShutdown(ml.shutdown))
require.NoError(t, err)
assert.NoError(t, mp.Start(ctx, &host{ballastSize: tt.ballastSize}))
assert.NoError(t, mp.Start(ctx, &host{}))
ml.memlimiter.CheckMemLimits()
err = mp.ConsumeMetrics(ctx, md)
if tt.expectError {
@ -239,7 +212,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
tests := []struct {
name string
mlCfg *Config
ballastSize uint64
memAlloc uint64
expectError bool
}{
@ -250,7 +222,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
ballastSize: 0,
memAlloc: 800,
expectError: false,
},
@ -261,29 +232,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
ballastSize: 0,
memAlloc: 1800,
expectError: true,
},
{
name: "Below memAllocLimit accounting for ballast",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
ballastSize: 1000,
memAlloc: 800,
expectError: false,
},
{
name: "Above memAllocLimit even accounting for ballast",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
ballastSize: 1000,
memAlloc: 1800,
expectError: true,
},
@ -294,7 +242,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
MemoryLimitPercentage: 50,
MemorySpikePercentage: 10,
},
ballastSize: 0,
memAlloc: 800,
expectError: false,
},
@ -305,7 +252,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
MemoryLimitPercentage: 50,
MemorySpikePercentage: 11,
},
ballastSize: 0,
memAlloc: 800,
expectError: true,
},
@ -314,7 +260,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
memorylimiter.GetMemoryFn = totalMemory
memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) {
ms.Alloc = tt.memAlloc + tt.ballastSize
ms.Alloc = tt.memAlloc
}
ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg)
@ -330,7 +276,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
processorhelper.WithShutdown(ml.shutdown))
require.NoError(t, err)
assert.NoError(t, tp.Start(ctx, &host{ballastSize: tt.ballastSize}))
assert.NoError(t, tp.Start(ctx, &host{}))
ml.memlimiter.CheckMemLimits()
err = tp.ConsumeTraces(ctx, td)
if tt.expectError {
@ -356,7 +302,6 @@ func TestLogMemoryPressureResponse(t *testing.T) {
tests := []struct {
name string
mlCfg *Config
ballastSize uint64
memAlloc uint64
expectError bool
}{
@ -367,7 +312,6 @@ func TestLogMemoryPressureResponse(t *testing.T) {
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
ballastSize: 0,
memAlloc: 800,
expectError: false,
},
@ -378,29 +322,6 @@ func TestLogMemoryPressureResponse(t *testing.T) {
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
ballastSize: 0,
memAlloc: 1800,
expectError: true,
},
{
name: "Below memAllocLimit accounting for ballast",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
ballastSize: 1000,
memAlloc: 800,
expectError: false,
},
{
name: "Above memAllocLimit even accounting for ballast",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
ballastSize: 1000,
memAlloc: 1800,
expectError: true,
},
@ -411,7 +332,6 @@ func TestLogMemoryPressureResponse(t *testing.T) {
MemoryLimitPercentage: 50,
MemorySpikePercentage: 10,
},
ballastSize: 0,
memAlloc: 800,
expectError: false,
},
@ -422,7 +342,6 @@ func TestLogMemoryPressureResponse(t *testing.T) {
MemoryLimitPercentage: 50,
MemorySpikePercentage: 11,
},
ballastSize: 0,
memAlloc: 800,
expectError: true,
},
@ -431,7 +350,7 @@ func TestLogMemoryPressureResponse(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
memorylimiter.GetMemoryFn = totalMemory
memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) {
ms.Alloc = tt.memAlloc + tt.ballastSize
ms.Alloc = tt.memAlloc
}
ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg)
@ -447,7 +366,7 @@ func TestLogMemoryPressureResponse(t *testing.T) {
processorhelper.WithShutdown(ml.shutdown))
require.NoError(t, err)
assert.NoError(t, tp.Start(ctx, &host{ballastSize: tt.ballastSize}))
assert.NoError(t, tp.Start(ctx, &host{}))
ml.memlimiter.CheckMemLimits()
err = tp.ConsumeLogs(ctx, ld)
if tt.expectError {
@ -465,26 +384,14 @@ func TestLogMemoryPressureResponse(t *testing.T) {
}
type host struct {
ballastSize uint64
component.Host
}
func (h *host) GetExtensions() map[component.ID]component.Component {
ret := make(map[component.ID]component.Component)
ret[component.MustNewID("ballast")] = &ballastExtension{ballastSize: h.ballastSize}
return ret
}
type ballastExtension struct {
ballastSize uint64
component.StartFunc
component.ShutdownFunc
}
func (be *ballastExtension) GetBallastSize() uint64 {
return be.ballastSize
}
func totalMemory() (uint64, error) {
return uint64(2048), nil
}

View File

@ -21,7 +21,6 @@ import (
// processMetrics is a struct that contains views related to process metrics (cpu, mem, etc)
type processMetrics struct {
startTimeUnixNano int64
ballastSizeBytes uint64
proc *process.Process
context context.Context
@ -54,7 +53,7 @@ func WithHostProc(hostProc string) RegisterOption {
// RegisterProcessMetrics creates a new set of processMetrics (mem, cpu) that can be used to measure
// basic information about this process.
func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, ballastSizeBytes uint64, opts ...RegisterOption) error {
func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, opts ...RegisterOption) error {
set := registerOption{}
for _, opt := range opts {
opt.apply(&set)
@ -62,7 +61,6 @@ func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, ballastSizeB
var err error
pm := &processMetrics{
startTimeUnixNano: time.Now().UnixNano(),
ballastSizeBytes: ballastSizeBytes,
ms: &runtime.MemStats{},
}
@ -139,10 +137,4 @@ func (pm *processMetrics) readMemStatsIfNeeded() {
}
pm.lastMsRead = now
runtime.ReadMemStats(pm.ms)
if pm.ballastSizeBytes > 0 {
pm.ms.Alloc -= pm.ballastSizeBytes
pm.ms.HeapAlloc -= pm.ballastSizeBytes
pm.ms.HeapSys -= pm.ballastSizeBytes
pm.ms.HeapInuse -= pm.ballastSizeBytes
}
}

View File

@ -21,7 +21,7 @@ func TestProcessTelemetryWithHostProc(t *testing.T) {
// Make the sure the environment variable value is not used.
t.Setenv("HOST_PROC", "foo/bar")
require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, 0, WithHostProc("/proc")))
require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, WithHostProc("/proc")))
// Check that the metrics are actually filled.
time.Sleep(200 * time.Millisecond)

View File

@ -78,7 +78,7 @@ func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_cli
func TestProcessTelemetry(t *testing.T) {
tel := setupTelemetry(t)
require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, 0))
require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings))
mp, err := fetchPrometheusMetrics(tel.promHandler)
require.NoError(t, err)

View File

@ -159,7 +159,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && cfg.Telemetry.Metrics.Address != "" {
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings, getBallastSize(srv.host)); err != nil {
if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings); err != nil {
return nil, fmt.Errorf("failed to register process metrics: %w", err)
}
}
@ -316,15 +316,6 @@ func (srv *Service) Logger() *zap.Logger {
return srv.telemetrySettings.Logger
}
func getBallastSize(host component.Host) uint64 {
for _, ext := range host.GetExtensions() {
if bExt, ok := ext.(interface{ GetBallastSize() uint64 }); ok {
return bExt.GetBallastSize()
}
}
return 0
}
func pdataFromSdk(res *sdkresource.Resource) pcommon.Resource {
// pcommon.NewResource is the best way to generate a new resource currently and is safe to use outside of tests.
// Because the resource is signal agnostic, and we need a net new resource, not an existing one, this is the only