This PR removes APIs that expose `configtelemetry.Level`. Internal functionality still uses the Level, but will be soon changed to use the new Instrument.Enabled. Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
parent
d59bf0a200
commit
8e522ad950
|
|
@ -0,0 +1,25 @@
|
|||
# Use this changelog template to create an entry for release notes.
|
||||
|
||||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
|
||||
change_type: deprecation
|
||||
|
||||
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
|
||||
component: component
|
||||
|
||||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
|
||||
note: Deprecate `TelemetrySettings.LeveledMeterProvider` and undo deprecation of `TelemetrySettings.MeterProvider`
|
||||
|
||||
# One or more tracking issues or pull requests related to the change
|
||||
issues: [11061]
|
||||
|
||||
# (Optional) One or more lines of additional information to render under the primary note.
|
||||
# These lines will be padded with 2 spaces and then inserted directly into the document.
|
||||
# Use pipe (|) for multiline entries.
|
||||
subtext:
|
||||
|
||||
# Optional: The change log or logs in which this entry should be included.
|
||||
# e.g. '[user]' or '[user, api]'
|
||||
# Include 'user' if the change is relevant to end users.
|
||||
# Include 'api' if there is a change to a library API.
|
||||
# Default: '[user]'
|
||||
change_logs: [api]
|
||||
|
|
@ -604,11 +604,11 @@ import (
|
|||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
)
|
||||
|
||||
// Deprecated: [v0.108.0] use LeveledMeter instead.
|
||||
func Meter(settings component.TelemetrySettings) metric.Meter {
|
||||
return settings.MeterProvider.Meter("")
|
||||
}
|
||||
|
||||
// Deprecated: [v0.114.0] use Meter instead.
|
||||
func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter {
|
||||
return settings.LeveledMeterProvider(level).Meter("")
|
||||
}
|
||||
|
|
@ -642,11 +642,11 @@ import (
|
|||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
)
|
||||
|
||||
// Deprecated: [v0.108.0] use LeveledMeter instead.
|
||||
func Meter(settings component.TelemetrySettings) metric.Meter {
|
||||
return settings.MeterProvider.Meter("")
|
||||
}
|
||||
|
||||
// Deprecated: [v0.114.0] use Meter instead.
|
||||
func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter {
|
||||
return settings.LeveledMeterProvider(level).Meter("")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,14 +26,15 @@ type componentTestTelemetry struct {
|
|||
|
||||
func (tt *componentTestTelemetry) NewSettings() receiver.Settings {
|
||||
set := receivertest.NewNopSettings()
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
set.ID = component.NewID(component.MustNewType("sample"))
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
return set
|
||||
}
|
||||
|
||||
func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings {
|
||||
set := componenttest.NewNopTelemetrySettings()
|
||||
set.MeterProvider = tt.meterProvider
|
||||
set.MetricsLevel = configtelemetry.LevelDetailed
|
||||
set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return tt.meterProvider
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,17 +7,18 @@ import (
|
|||
"errors"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
)
|
||||
|
||||
// Deprecated: [v0.108.0] use LeveledMeter instead.
|
||||
func Meter(settings component.TelemetrySettings) metric.Meter {
|
||||
return settings.MeterProvider.Meter("go.opentelemetry.io/collector/internal/receiver/samplereceiver")
|
||||
}
|
||||
|
||||
// Deprecated: [v0.114.0] use Meter instead.
|
||||
func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter {
|
||||
return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/internal/receiver/samplereceiver")
|
||||
}
|
||||
|
|
@ -35,7 +36,6 @@ type TelemetryBuilder struct {
|
|||
observeProcessRuntimeTotalAllocBytes func(context.Context, metric.Observer) error
|
||||
QueueLength metric.Int64ObservableGauge
|
||||
RequestDuration metric.Float64Histogram
|
||||
meters map[configtelemetry.Level]metric.Meter
|
||||
}
|
||||
|
||||
// TelemetryBuilderOption applies changes to default builder.
|
||||
|
|
@ -62,7 +62,7 @@ func WithProcessRuntimeTotalAllocBytesCallback(cb func() int64, opts ...metric.O
|
|||
// InitQueueLength configures the QueueLength metric.
|
||||
func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric.ObserveOption) error {
|
||||
var err error
|
||||
builder.QueueLength, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableGauge(
|
||||
builder.QueueLength, err = builder.meter.Int64ObservableGauge(
|
||||
"otelcol_queue_length",
|
||||
metric.WithDescription("This metric is optional and therefore not initialized in NewTelemetryBuilder."),
|
||||
metric.WithUnit("{items}"),
|
||||
|
|
@ -70,7 +70,7 @@ func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(func(_ context.Context, o metric.Observer) error {
|
||||
_, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
|
||||
o.ObserveInt64(builder.QueueLength, cb(), opts...)
|
||||
return nil
|
||||
}, builder.QueueLength)
|
||||
|
|
@ -80,27 +80,27 @@ func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric
|
|||
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
|
||||
// for a component
|
||||
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {
|
||||
builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}}
|
||||
builder := TelemetryBuilder{}
|
||||
for _, op := range options {
|
||||
op.apply(&builder)
|
||||
}
|
||||
builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic)
|
||||
builder.meter = Meter(settings)
|
||||
var err, errs error
|
||||
builder.BatchSizeTriggerSend, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.BatchSizeTriggerSend, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_batch_size_trigger_send",
|
||||
metric.WithDescription("Number of times the batch was sent due to a size trigger [deprecated since v0.110.0]"),
|
||||
metric.WithUnit("{times}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessRuntimeTotalAllocBytes, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableCounter(
|
||||
builder.ProcessRuntimeTotalAllocBytes, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64ObservableCounter(
|
||||
"otelcol_process_runtime_total_alloc_bytes",
|
||||
metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"),
|
||||
metric.WithUnit("By"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
_, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessRuntimeTotalAllocBytes, builder.ProcessRuntimeTotalAllocBytes)
|
||||
_, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessRuntimeTotalAllocBytes, builder.ProcessRuntimeTotalAllocBytes)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.RequestDuration, err = builder.meters[configtelemetry.LevelBasic].Float64Histogram(
|
||||
builder.RequestDuration, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Float64Histogram(
|
||||
"otelcol_request_duration",
|
||||
metric.WithDescription("Duration of request [alpha]"),
|
||||
metric.WithUnit("s"),
|
||||
|
|
@ -109,3 +109,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
|
|||
errs = errors.Join(errs, err)
|
||||
return &builder, errs
|
||||
}
|
||||
|
||||
func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter {
|
||||
if cfgLevel <= srvLevel {
|
||||
return meter
|
||||
}
|
||||
return noop.Meter{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,8 +31,8 @@ type componentTestTelemetry struct {
|
|||
{{- if or isConnector isExporter isExtension isProcessor isReceiver }}
|
||||
func (tt *componentTestTelemetry) NewSettings() {{ .Status.Class }}.Settings {
|
||||
set := {{ .Status.Class }}test.NewNopSettings()
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
set.ID = component.NewID(component.MustNewType("{{ .Type }}"))
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
return set
|
||||
}
|
||||
{{- end }}
|
||||
|
|
@ -40,6 +40,7 @@ func (tt *componentTestTelemetry) NewSettings() {{ .Status.Class }}.Settings {
|
|||
func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings {
|
||||
set := componenttest.NewNopTelemetrySettings()
|
||||
set.MeterProvider = tt.meterProvider
|
||||
set.MetricsLevel = configtelemetry.LevelDetailed
|
||||
set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return tt.meterProvider
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,11 +20,11 @@ import (
|
|||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
)
|
||||
|
||||
// Deprecated: [v0.108.0] use LeveledMeter instead.
|
||||
func Meter(settings component.TelemetrySettings) metric.Meter {
|
||||
return settings.MeterProvider.Meter("{{ .ScopeName }}")
|
||||
}
|
||||
|
||||
// Deprecated: [v0.114.0] use Meter instead.
|
||||
func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter {
|
||||
return settings.LeveledMeterProvider(level).Meter("{{ .ScopeName }}")
|
||||
}
|
||||
|
|
@ -44,7 +44,6 @@ type TelemetryBuilder struct {
|
|||
observe{{ $name.Render }} func(context.Context, metric.Observer) error
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
meters map[configtelemetry.Level]metric.Meter
|
||||
}
|
||||
|
||||
// TelemetryBuilderOption applies changes to default builder.
|
||||
|
|
@ -64,7 +63,7 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
|
|||
// Init{{ $name.Render }} configures the {{ $name.Render }} metric.
|
||||
func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async -}}cb func() {{ $metric.Data.BasicType }}{{- end }}, opts ...metric.ObserveOption) error {
|
||||
var err error
|
||||
builder.{{ $name.Render }}, err = builder.meters[configtelemetry.Level{{ casesTitle $metric.Level.String }}].{{ $metric.Data.Instrument }}(
|
||||
builder.{{ $name.Render }}, err = builder.meter.{{ $metric.Data.Instrument }}(
|
||||
"otelcol_{{ $name }}",
|
||||
metric.WithDescription("{{ $metric.Description }}"),
|
||||
metric.WithUnit("{{ $metric.Unit }}"),
|
||||
|
|
@ -76,7 +75,7 @@ func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = builder.meters[configtelemetry.Level{{ casesTitle $metric.Level.String }}].RegisterCallback(func(_ context.Context, o metric.Observer) error {
|
||||
_, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
|
||||
o.Observe{{ casesTitle $metric.Data.BasicType }}(builder.{{ $name.Render }}, cb(), opts...)
|
||||
return nil
|
||||
}, builder.{{ $name.Render }})
|
||||
|
|
@ -103,18 +102,16 @@ func With{{ $name.Render }}Callback(cb func() {{ $metric.Data.BasicType }}, opts
|
|||
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
|
||||
// for a component
|
||||
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {
|
||||
builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}}
|
||||
builder := TelemetryBuilder{}
|
||||
for _, op := range options {
|
||||
op.apply(&builder)
|
||||
}
|
||||
{{- range $level, $val := .Telemetry.Levels }}
|
||||
builder.meters[configtelemetry.Level{{ casesTitle $level }}] = LeveledMeter(settings, configtelemetry.Level{{ casesTitle $level }})
|
||||
{{- end }}
|
||||
builder.meter = Meter(settings)
|
||||
var err, errs error
|
||||
|
||||
{{- range $name, $metric := .Telemetry.Metrics }}
|
||||
{{- if not $metric.Optional }}
|
||||
builder.{{ $name.Render }}, err = builder.meters[configtelemetry.Level{{ casesTitle $metric.Level.String }}].{{ $metric.Data.Instrument }}(
|
||||
builder.{{ $name.Render }}, err = getLeveledMeter(builder.meter, configtelemetry.Level{{ $metric.Level }}, settings.MetricsLevel).{{ $metric.Data.Instrument }}(
|
||||
"otelcol_{{ $name }}",
|
||||
metric.WithDescription("{{ $metric.Description }}{{ $metric.Stability }}"),
|
||||
metric.WithUnit("{{ $metric.Unit }}"),
|
||||
|
|
@ -124,7 +121,7 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
|
|||
)
|
||||
errs = errors.Join(errs, err)
|
||||
{{- if $metric.Data.Async }}
|
||||
_, err = builder.meters[configtelemetry.Level{{ casesTitle $metric.Level.String }}].RegisterCallback(builder.observe{{ $name.Render }}, builder.{{ $name.Render }})
|
||||
_, err = getLeveledMeter(builder.meter, configtelemetry.Level{{ $metric.Level }}, settings.MetricsLevel).RegisterCallback(builder.observe{{ $name.Render }}, builder.{{ $name.Render }})
|
||||
errs = errors.Join(errs, err)
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
|
@ -132,4 +129,11 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
|
|||
return &builder, errs
|
||||
}
|
||||
|
||||
func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter {
|
||||
if cfgLevel <= srvLevel {
|
||||
return meter
|
||||
}
|
||||
return noop.Meter{}
|
||||
}
|
||||
|
||||
{{- end }}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ package componenttest // import "go.opentelemetry.io/collector/component/compone
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
|
|
@ -12,10 +13,11 @@ import (
|
|||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/sdk/trace/tracetest"
|
||||
"go.uber.org/multierr"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
"go.opentelemetry.io/collector/pdata/pcommon"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -32,12 +34,10 @@ const (
|
|||
)
|
||||
|
||||
type TestTelemetry struct {
|
||||
ts component.TelemetrySettings
|
||||
id component.ID
|
||||
ts component.TelemetrySettings
|
||||
SpanRecorder *tracetest.SpanRecorder
|
||||
|
||||
reader *sdkmetric.ManualReader
|
||||
meterProvider *sdkmetric.MeterProvider
|
||||
reader *sdkmetric.ManualReader
|
||||
}
|
||||
|
||||
// CheckExporterTraces checks that for the current exported values for trace exporter metrics match given values.
|
||||
|
|
@ -101,12 +101,9 @@ func (tts *TestTelemetry) CheckScraperMetrics(receiver component.ID, scraper com
|
|||
|
||||
// Shutdown unregisters any views and shuts down the SpanRecorder
|
||||
func (tts *TestTelemetry) Shutdown(ctx context.Context) error {
|
||||
var errs error
|
||||
errs = multierr.Append(errs, tts.SpanRecorder.Shutdown(ctx))
|
||||
if tts.meterProvider != nil {
|
||||
errs = multierr.Append(errs, tts.meterProvider.Shutdown(ctx))
|
||||
}
|
||||
return errs
|
||||
return errors.Join(
|
||||
tts.ts.TracerProvider.(*sdktrace.TracerProvider).Shutdown(ctx),
|
||||
tts.ts.MeterProvider.(*sdkmetric.MeterProvider).Shutdown(ctx))
|
||||
}
|
||||
|
||||
// TelemetrySettings returns the TestTelemetry's TelemetrySettings
|
||||
|
|
@ -118,23 +115,26 @@ func (tts *TestTelemetry) TelemetrySettings() component.TelemetrySettings {
|
|||
// The caller must pass the ID of the component being tested. The ID will be used by the CreateSettings and Check methods.
|
||||
// The caller must defer a call to `Shutdown` on the returned TestTelemetry.
|
||||
func SetupTelemetry(id component.ID) (TestTelemetry, error) {
|
||||
sr := new(tracetest.SpanRecorder)
|
||||
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
|
||||
|
||||
settings := TestTelemetry{
|
||||
ts: NewNopTelemetrySettings(),
|
||||
id: id,
|
||||
SpanRecorder: sr,
|
||||
reader: sdkmetric.NewManualReader(),
|
||||
SpanRecorder: new(tracetest.SpanRecorder),
|
||||
}
|
||||
settings.ts.TracerProvider = tp
|
||||
|
||||
settings.reader = sdkmetric.NewManualReader()
|
||||
settings.meterProvider = sdkmetric.NewMeterProvider(
|
||||
mp := sdkmetric.NewMeterProvider(
|
||||
sdkmetric.WithResource(resource.Empty()),
|
||||
sdkmetric.WithReader(settings.reader),
|
||||
)
|
||||
settings.ts.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return settings.meterProvider
|
||||
|
||||
settings.ts = component.TelemetrySettings{
|
||||
Logger: zap.NewNop(),
|
||||
TracerProvider: sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(settings.SpanRecorder)),
|
||||
MeterProvider: mp,
|
||||
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return mp
|
||||
},
|
||||
MetricsLevel: configtelemetry.LevelDetailed,
|
||||
Resource: pcommon.NewResource(),
|
||||
}
|
||||
|
||||
return settings, nil
|
||||
|
|
|
|||
|
|
@ -22,12 +22,11 @@ type TelemetrySettings struct {
|
|||
TracerProvider trace.TracerProvider
|
||||
|
||||
// MeterProvider that the factory can pass to other instrumented third-party libraries.
|
||||
//
|
||||
// Deprecated [v0.109.0]: use LeveledMeterProvider instead.
|
||||
MeterProvider metric.MeterProvider
|
||||
|
||||
// LeveledMeterProvider returns a MeterProvider for a Level that the factory can
|
||||
// pass to other instrumented third-party libraries.
|
||||
// Deprecated [v0.114.0]: use MeterProvider instead.
|
||||
LeveledMeterProvider func(level configtelemetry.Level) metric.MeterProvider
|
||||
|
||||
// MetricsLevel represents the configuration value set when the collector
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@ import (
|
|||
"github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
|
@ -337,7 +339,7 @@ func (gcs *ClientConfig) getGrpcDialOptions(
|
|||
otelOpts := []otelgrpc.Option{
|
||||
otelgrpc.WithTracerProvider(settings.TracerProvider),
|
||||
otelgrpc.WithPropagators(otel.GetTextMapPropagator()),
|
||||
otelgrpc.WithMeterProvider(settings.LeveledMeterProvider(configtelemetry.LevelDetailed)),
|
||||
otelgrpc.WithMeterProvider(getLeveledMeterProvider(settings)),
|
||||
}
|
||||
|
||||
// Enable OpenTelemetry observability plugin.
|
||||
|
|
@ -481,7 +483,7 @@ func (gss *ServerConfig) getGrpcServerOptions(
|
|||
otelOpts := []otelgrpc.Option{
|
||||
otelgrpc.WithTracerProvider(settings.TracerProvider),
|
||||
otelgrpc.WithPropagators(otel.GetTextMapPropagator()),
|
||||
otelgrpc.WithMeterProvider(settings.LeveledMeterProvider(configtelemetry.LevelDetailed)),
|
||||
otelgrpc.WithMeterProvider(getLeveledMeterProvider(settings)),
|
||||
}
|
||||
|
||||
// Enable OpenTelemetry observability plugin.
|
||||
|
|
@ -575,3 +577,10 @@ func authStreamServerInterceptor(srv any, stream grpc.ServerStream, _ *grpc.Stre
|
|||
|
||||
return handler(srv, wrapServerStream(ctx, stream))
|
||||
}
|
||||
|
||||
func getLeveledMeterProvider(settings component.TelemetrySettings) metric.MeterProvider {
|
||||
if configtelemetry.LevelDetailed <= settings.MetricsLevel {
|
||||
return settings.MeterProvider
|
||||
}
|
||||
return noop.MeterProvider{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ require (
|
|||
go.opentelemetry.io/collector/pdata/testdata v0.113.0
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0
|
||||
go.opentelemetry.io/otel v1.32.0
|
||||
go.opentelemetry.io/otel/metric v1.32.0
|
||||
go.uber.org/goleak v1.3.0
|
||||
go.uber.org/zap v1.27.0
|
||||
google.golang.org/grpc v1.67.1
|
||||
|
|
@ -40,7 +41,6 @@ require (
|
|||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
go.opentelemetry.io/collector/extension v0.113.0 // indirect
|
||||
go.opentelemetry.io/collector/pdata/pprofile v0.113.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.32.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.32.0 // indirect
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@ import (
|
|||
"github.com/rs/cors"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/publicsuffix"
|
||||
|
||||
|
|
@ -226,7 +228,7 @@ func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, sett
|
|||
otelOpts := []otelhttp.Option{
|
||||
otelhttp.WithTracerProvider(settings.TracerProvider),
|
||||
otelhttp.WithPropagators(otel.GetTextMapPropagator()),
|
||||
otelhttp.WithMeterProvider(settings.LeveledMeterProvider(configtelemetry.LevelDetailed)),
|
||||
otelhttp.WithMeterProvider(getLeveledMeterProvider(settings)),
|
||||
}
|
||||
// wrapping http transport with otelhttp transport to enable otel instrumentation
|
||||
if settings.TracerProvider != nil && settings.MeterProvider != nil {
|
||||
|
|
@ -465,7 +467,7 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin
|
|||
otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string {
|
||||
return r.URL.Path
|
||||
}),
|
||||
otelhttp.WithMeterProvider(settings.LeveledMeterProvider(configtelemetry.LevelDetailed)),
|
||||
otelhttp.WithMeterProvider(getLeveledMeterProvider(settings)),
|
||||
}
|
||||
|
||||
// Enable OpenTelemetry observability plugin.
|
||||
|
|
@ -553,3 +555,10 @@ func maxRequestBodySizeInterceptor(next http.Handler, maxRecvSize int64) http.Ha
|
|||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func getLeveledMeterProvider(settings component.TelemetrySettings) metric.MeterProvider {
|
||||
if configtelemetry.LevelDetailed <= settings.MetricsLevel {
|
||||
return settings.MeterProvider
|
||||
}
|
||||
return noop.MeterProvider{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,14 +26,15 @@ type componentTestTelemetry struct {
|
|||
|
||||
func (tt *componentTestTelemetry) NewSettings() exporter.Settings {
|
||||
set := exportertest.NewNopSettings()
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
set.ID = component.NewID(component.MustNewType("exporterhelper"))
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
return set
|
||||
}
|
||||
|
||||
func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings {
|
||||
set := componenttest.NewNopTelemetrySettings()
|
||||
set.MeterProvider = tt.meterProvider
|
||||
set.MetricsLevel = configtelemetry.LevelDetailed
|
||||
set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return tt.meterProvider
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,17 +7,18 @@ import (
|
|||
"errors"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
)
|
||||
|
||||
// Deprecated: [v0.108.0] use LeveledMeter instead.
|
||||
func Meter(settings component.TelemetrySettings) metric.Meter {
|
||||
return settings.MeterProvider.Meter("go.opentelemetry.io/collector/exporter/exporterhelper")
|
||||
}
|
||||
|
||||
// Deprecated: [v0.114.0] use Meter instead.
|
||||
func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter {
|
||||
return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/exporter/exporterhelper")
|
||||
}
|
||||
|
|
@ -41,7 +42,6 @@ type TelemetryBuilder struct {
|
|||
ExporterSentLogRecords metric.Int64Counter
|
||||
ExporterSentMetricPoints metric.Int64Counter
|
||||
ExporterSentSpans metric.Int64Counter
|
||||
meters map[configtelemetry.Level]metric.Meter
|
||||
}
|
||||
|
||||
// TelemetryBuilderOption applies changes to default builder.
|
||||
|
|
@ -58,7 +58,7 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
|
|||
// InitExporterQueueCapacity configures the ExporterQueueCapacity metric.
|
||||
func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts ...metric.ObserveOption) error {
|
||||
var err error
|
||||
builder.ExporterQueueCapacity, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableGauge(
|
||||
builder.ExporterQueueCapacity, err = builder.meter.Int64ObservableGauge(
|
||||
"otelcol_exporter_queue_capacity",
|
||||
metric.WithDescription("Fixed capacity of the retry queue (in batches)"),
|
||||
metric.WithUnit("{batches}"),
|
||||
|
|
@ -66,7 +66,7 @@ func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(func(_ context.Context, o metric.Observer) error {
|
||||
_, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
|
||||
o.ObserveInt64(builder.ExporterQueueCapacity, cb(), opts...)
|
||||
return nil
|
||||
}, builder.ExporterQueueCapacity)
|
||||
|
|
@ -76,7 +76,7 @@ func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts
|
|||
// InitExporterQueueSize configures the ExporterQueueSize metric.
|
||||
func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ...metric.ObserveOption) error {
|
||||
var err error
|
||||
builder.ExporterQueueSize, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableGauge(
|
||||
builder.ExporterQueueSize, err = builder.meter.Int64ObservableGauge(
|
||||
"otelcol_exporter_queue_size",
|
||||
metric.WithDescription("Current size of the retry queue (in batches)"),
|
||||
metric.WithUnit("{batches}"),
|
||||
|
|
@ -84,7 +84,7 @@ func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ...
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(func(_ context.Context, o metric.Observer) error {
|
||||
_, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
|
||||
o.ObserveInt64(builder.ExporterQueueSize, cb(), opts...)
|
||||
return nil
|
||||
}, builder.ExporterQueueSize)
|
||||
|
|
@ -94,61 +94,61 @@ func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ...
|
|||
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
|
||||
// for a component
|
||||
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {
|
||||
builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}}
|
||||
builder := TelemetryBuilder{}
|
||||
for _, op := range options {
|
||||
op.apply(&builder)
|
||||
}
|
||||
builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic)
|
||||
builder.meter = Meter(settings)
|
||||
var err, errs error
|
||||
builder.ExporterEnqueueFailedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ExporterEnqueueFailedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_exporter_enqueue_failed_log_records",
|
||||
metric.WithDescription("Number of log records failed to be added to the sending queue."),
|
||||
metric.WithUnit("{records}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ExporterEnqueueFailedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ExporterEnqueueFailedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_exporter_enqueue_failed_metric_points",
|
||||
metric.WithDescription("Number of metric points failed to be added to the sending queue."),
|
||||
metric.WithUnit("{datapoints}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ExporterEnqueueFailedSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ExporterEnqueueFailedSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_exporter_enqueue_failed_spans",
|
||||
metric.WithDescription("Number of spans failed to be added to the sending queue."),
|
||||
metric.WithUnit("{spans}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ExporterSendFailedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ExporterSendFailedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_exporter_send_failed_log_records",
|
||||
metric.WithDescription("Number of log records in failed attempts to send to destination."),
|
||||
metric.WithUnit("{records}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ExporterSendFailedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ExporterSendFailedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_exporter_send_failed_metric_points",
|
||||
metric.WithDescription("Number of metric points in failed attempts to send to destination."),
|
||||
metric.WithUnit("{datapoints}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ExporterSendFailedSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ExporterSendFailedSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_exporter_send_failed_spans",
|
||||
metric.WithDescription("Number of spans in failed attempts to send to destination."),
|
||||
metric.WithUnit("{spans}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ExporterSentLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ExporterSentLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_exporter_sent_log_records",
|
||||
metric.WithDescription("Number of log record successfully sent to destination."),
|
||||
metric.WithUnit("{records}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ExporterSentMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ExporterSentMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_exporter_sent_metric_points",
|
||||
metric.WithDescription("Number of metric points successfully sent to destination."),
|
||||
metric.WithUnit("{datapoints}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ExporterSentSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ExporterSentSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_exporter_sent_spans",
|
||||
metric.WithDescription("Number of spans successfully sent to destination."),
|
||||
metric.WithUnit("{spans}"),
|
||||
|
|
@ -156,3 +156,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
|
|||
errs = errors.Join(errs, err)
|
||||
return &builder, errs
|
||||
}
|
||||
|
||||
func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter {
|
||||
if cfgLevel <= srvLevel {
|
||||
return meter
|
||||
}
|
||||
return noop.Meter{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ import (
|
|||
|
||||
"go.opentelemetry.io/collector/client"
|
||||
"go.opentelemetry.io/collector/component/componenttest"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/consumer/consumererror"
|
||||
"go.opentelemetry.io/collector/consumer/consumertest"
|
||||
|
|
@ -85,11 +84,9 @@ func TestBatchProcessorSpansDelivered(t *testing.T) {
|
|||
sink := new(consumertest.TracesSink)
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.SendBatchSize = 128
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
requestCount := 1000
|
||||
spansPerRequest := 100
|
||||
|
|
@ -101,14 +98,14 @@ func TestBatchProcessorSpansDelivered(t *testing.T) {
|
|||
spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex))
|
||||
}
|
||||
td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty())
|
||||
require.NoError(t, batcher.ConsumeTraces(context.Background(), td))
|
||||
require.NoError(t, traces.ConsumeTraces(context.Background(), td))
|
||||
}
|
||||
|
||||
// Added to test logic that check for empty resources.
|
||||
td := ptrace.NewTraces()
|
||||
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
|
||||
assert.NoError(t, traces.ConsumeTraces(context.Background(), td))
|
||||
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, traces.Shutdown(context.Background()))
|
||||
|
||||
require.Equal(t, requestCount*spansPerRequest, sink.SpanCount())
|
||||
receivedTraces := sink.AllTraces()
|
||||
|
|
@ -128,11 +125,9 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
|
|||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.SendBatchSize = 128
|
||||
cfg.SendBatchMaxSize = 130
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
requestCount := 1000
|
||||
spansPerRequest := 150
|
||||
|
|
@ -142,12 +137,12 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
|
|||
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
|
||||
spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex))
|
||||
}
|
||||
require.NoError(t, batcher.ConsumeTraces(context.Background(), td))
|
||||
require.NoError(t, traces.ConsumeTraces(context.Background(), td))
|
||||
}
|
||||
|
||||
// Added to test logic that check for empty resources.
|
||||
td := ptrace.NewTraces()
|
||||
require.NoError(t, batcher.ConsumeTraces(context.Background(), td))
|
||||
require.NoError(t, traces.ConsumeTraces(context.Background(), td))
|
||||
|
||||
// wait for all spans to be reported
|
||||
for {
|
||||
|
|
@ -157,7 +152,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
|
|||
<-time.After(cfg.Timeout)
|
||||
}
|
||||
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, traces.Shutdown(context.Background()))
|
||||
|
||||
require.Equal(t, requestCount*spansPerRequest, sink.SpanCount())
|
||||
for i := 0; i < len(sink.AllTraces())-1; i++ {
|
||||
|
|
@ -182,21 +177,20 @@ func TestBatchProcessorSentBySize(t *testing.T) {
|
|||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.SendBatchSize = sendBatchSize
|
||||
cfg.Timeout = 500 * time.Millisecond
|
||||
creationSet := tel.NewSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
|
||||
traces, err := NewFactory().CreateTraces(context.Background(), tel.NewSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
start := time.Now()
|
||||
sizeSum := 0
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
td := testdata.GenerateTraces(spansPerRequest)
|
||||
|
||||
require.NoError(t, batcher.ConsumeTraces(context.Background(), td))
|
||||
require.NoError(t, traces.ConsumeTraces(context.Background(), td))
|
||||
}
|
||||
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, traces.Shutdown(context.Background()))
|
||||
|
||||
elapsed := time.Since(start)
|
||||
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())
|
||||
|
|
@ -285,6 +279,7 @@ func TestBatchProcessorSentBySize(t *testing.T) {
|
|||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, tel.Shutdown(context.Background()))
|
||||
}
|
||||
|
||||
func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) {
|
||||
|
|
@ -303,21 +298,20 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) {
|
|||
cfg.SendBatchSize = uint32(sendBatchSize)
|
||||
cfg.SendBatchMaxSize = uint32(sendBatchMaxSize)
|
||||
cfg.Timeout = 500 * time.Millisecond
|
||||
creationSet := tel.NewSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
|
||||
traces, err := NewFactory().CreateTraces(context.Background(), tel.NewSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
start := time.Now()
|
||||
|
||||
sizeSum := 0
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
td := testdata.GenerateTraces(spansPerRequest)
|
||||
require.NoError(t, batcher.ConsumeTraces(context.Background(), td))
|
||||
require.NoError(t, traces.ConsumeTraces(context.Background(), td))
|
||||
}
|
||||
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, traces.Shutdown(context.Background()))
|
||||
|
||||
elapsed := time.Since(start)
|
||||
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())
|
||||
|
|
@ -425,6 +419,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) {
|
|||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, tel.Shutdown(context.Background()))
|
||||
}
|
||||
|
||||
func TestBatchProcessorSentByTimeout(t *testing.T) {
|
||||
|
|
@ -438,15 +433,13 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {
|
|||
spansPerRequest := 10
|
||||
start := time.Now()
|
||||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
td := testdata.GenerateTraces(spansPerRequest)
|
||||
require.NoError(t, batcher.ConsumeTraces(context.Background(), td))
|
||||
require.NoError(t, traces.ConsumeTraces(context.Background(), td))
|
||||
}
|
||||
|
||||
// Wait for at least one batch to be sent.
|
||||
|
|
@ -461,7 +454,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {
|
|||
require.LessOrEqual(t, cfg.Timeout.Nanoseconds(), elapsed.Nanoseconds())
|
||||
|
||||
// This should not change the results in the sink, verified by the expectedBatchesNum
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, traces.Shutdown(context.Background()))
|
||||
|
||||
expectedBatchesNum := 1
|
||||
expectedBatchingFactor := 5
|
||||
|
|
@ -479,26 +472,24 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {
|
||||
cfg := Config{
|
||||
cfg := &Config{
|
||||
Timeout: 3 * time.Second,
|
||||
SendBatchSize: 1000,
|
||||
}
|
||||
sink := new(consumertest.TracesSink)
|
||||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, &cfg)
|
||||
traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
requestCount := 10
|
||||
spansPerRequest := 10
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
td := testdata.GenerateTraces(spansPerRequest)
|
||||
require.NoError(t, batcher.ConsumeTraces(context.Background(), td))
|
||||
require.NoError(t, traces.ConsumeTraces(context.Background(), td))
|
||||
}
|
||||
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, traces.Shutdown(context.Background()))
|
||||
|
||||
require.Equal(t, requestCount*spansPerRequest, sink.SpanCount())
|
||||
require.Len(t, sink.AllTraces(), 1)
|
||||
|
|
@ -507,7 +498,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {
|
|||
func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
|
||||
// Instantiate the batch processor with low config values to test data
|
||||
// gets sent through the processor.
|
||||
cfg := Config{
|
||||
cfg := &Config{
|
||||
Timeout: 200 * time.Millisecond,
|
||||
SendBatchSize: 50,
|
||||
}
|
||||
|
|
@ -516,38 +507,36 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
|
|||
metricsPerRequest := 5
|
||||
sink := new(consumertest.MetricsSink)
|
||||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg)
|
||||
metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
sentResourceMetrics := pmetric.NewMetrics().ResourceMetrics()
|
||||
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
md := testdata.GenerateMetrics(metricsPerRequest)
|
||||
metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
|
||||
ms := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
|
||||
for metricIndex := 0; metricIndex < metricsPerRequest; metricIndex++ {
|
||||
metrics.At(metricIndex).SetName(getTestMetricName(requestNum, metricIndex))
|
||||
ms.At(metricIndex).SetName(getTestMetricName(requestNum, metricIndex))
|
||||
}
|
||||
md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty())
|
||||
require.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
|
||||
require.NoError(t, metrics.ConsumeMetrics(context.Background(), md))
|
||||
}
|
||||
|
||||
// Added to test case with empty resources sent.
|
||||
md := pmetric.NewMetrics()
|
||||
assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
|
||||
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), md))
|
||||
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, metrics.Shutdown(context.Background()))
|
||||
|
||||
require.Equal(t, requestCount*2*metricsPerRequest, sink.DataPointCount())
|
||||
receivedMds := sink.AllMetrics()
|
||||
metricsReceivedByName := metricsReceivedByName(receivedMds)
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
metrics := sentResourceMetrics.At(requestNum).ScopeMetrics().At(0).Metrics()
|
||||
ms := sentResourceMetrics.At(requestNum).ScopeMetrics().At(0).Metrics()
|
||||
for metricIndex := 0; metricIndex < metricsPerRequest; metricIndex++ {
|
||||
require.EqualValues(t,
|
||||
metrics.At(metricIndex),
|
||||
ms.At(metricIndex),
|
||||
metricsReceivedByName[getTestMetricName(requestNum, metricIndex)])
|
||||
}
|
||||
}
|
||||
|
|
@ -559,7 +548,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) {
|
|||
|
||||
// Instantiate the batch processor with low config values to test data
|
||||
// gets sent through the processor.
|
||||
cfg := Config{
|
||||
cfg := &Config{
|
||||
Timeout: 100 * time.Millisecond,
|
||||
SendBatchSize: 50,
|
||||
}
|
||||
|
|
@ -572,20 +561,18 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) {
|
|||
)
|
||||
sink := new(consumertest.MetricsSink)
|
||||
|
||||
creationSet := tel.NewSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg)
|
||||
metrics, err := NewFactory().CreateMetrics(context.Background(), tel.NewSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
start := time.Now()
|
||||
size := 0
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
md := testdata.GenerateMetrics(metricsPerRequest)
|
||||
size += sizer.MetricsSize(md)
|
||||
require.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
|
||||
require.NoError(t, metrics.ConsumeMetrics(context.Background(), md))
|
||||
}
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, metrics.Shutdown(context.Background()))
|
||||
|
||||
elapsed := time.Since(start)
|
||||
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())
|
||||
|
|
@ -675,6 +662,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) {
|
|||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, tel.Shutdown(context.Background()))
|
||||
}
|
||||
|
||||
func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {
|
||||
|
|
@ -698,7 +686,7 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestBatchMetricsProcessor_Timeout(t *testing.T) {
|
||||
cfg := Config{
|
||||
cfg := &Config{
|
||||
Timeout: 100 * time.Millisecond,
|
||||
SendBatchSize: 101,
|
||||
}
|
||||
|
|
@ -706,16 +694,14 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {
|
|||
metricsPerRequest := 10
|
||||
sink := new(consumertest.MetricsSink)
|
||||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg)
|
||||
metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
start := time.Now()
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
md := testdata.GenerateMetrics(metricsPerRequest)
|
||||
require.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
|
||||
require.NoError(t, metrics.ConsumeMetrics(context.Background(), md))
|
||||
}
|
||||
|
||||
// Wait for at least one batch to be sent.
|
||||
|
|
@ -730,7 +716,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {
|
|||
require.LessOrEqual(t, cfg.Timeout.Nanoseconds(), elapsed.Nanoseconds())
|
||||
|
||||
// This should not change the results in the sink, verified by the expectedBatchesNum
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, metrics.Shutdown(context.Background()))
|
||||
|
||||
expectedBatchesNum := 1
|
||||
expectedBatchingFactor := 5
|
||||
|
|
@ -747,7 +733,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestBatchMetricProcessor_Shutdown(t *testing.T) {
|
||||
cfg := Config{
|
||||
cfg := &Config{
|
||||
Timeout: 3 * time.Second,
|
||||
SendBatchSize: 1000,
|
||||
}
|
||||
|
|
@ -755,18 +741,16 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) {
|
|||
metricsPerRequest := 10
|
||||
sink := new(consumertest.MetricsSink)
|
||||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg)
|
||||
metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
md := testdata.GenerateMetrics(metricsPerRequest)
|
||||
require.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
|
||||
require.NoError(t, metrics.ConsumeMetrics(context.Background(), md))
|
||||
}
|
||||
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, metrics.Shutdown(context.Background()))
|
||||
|
||||
require.Equal(t, requestCount*2*metricsPerRequest, sink.DataPointCount())
|
||||
require.Len(t, sink.AllMetrics(), 1)
|
||||
|
|
@ -833,7 +817,7 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) {
|
|||
|
||||
func BenchmarkBatchMetricProcessor(b *testing.B) {
|
||||
b.StopTimer()
|
||||
cfg := Config{
|
||||
cfg := &Config{
|
||||
Timeout: 100 * time.Millisecond,
|
||||
SendBatchSize: 2000,
|
||||
}
|
||||
|
|
@ -842,7 +826,7 @@ func BenchmarkBatchMetricProcessor(b *testing.B) {
|
|||
|
||||
func BenchmarkMultiBatchMetricProcessor(b *testing.B) {
|
||||
b.StopTimer()
|
||||
cfg := Config{
|
||||
cfg := &Config{
|
||||
Timeout: 100 * time.Millisecond,
|
||||
SendBatchSize: 2000,
|
||||
MetadataKeys: []string{"test", "test2"},
|
||||
|
|
@ -850,24 +834,22 @@ func BenchmarkMultiBatchMetricProcessor(b *testing.B) {
|
|||
runMetricsProcessorBenchmark(b, cfg)
|
||||
}
|
||||
|
||||
func runMetricsProcessorBenchmark(b *testing.B, cfg Config) {
|
||||
func runMetricsProcessorBenchmark(b *testing.B, cfg *Config) {
|
||||
ctx := context.Background()
|
||||
sink := new(metricsSink)
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
metricsPerRequest := 1000
|
||||
batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg)
|
||||
metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(b, err)
|
||||
require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost()))
|
||||
require.NoError(b, metrics.Start(ctx, componenttest.NewNopHost()))
|
||||
|
||||
const metricsPerRequest = 1000
|
||||
b.StartTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
require.NoError(b, batcher.ConsumeMetrics(ctx, testdata.GenerateMetrics(metricsPerRequest)))
|
||||
require.NoError(b, metrics.ConsumeMetrics(ctx, testdata.GenerateMetrics(metricsPerRequest)))
|
||||
}
|
||||
})
|
||||
b.StopTimer()
|
||||
require.NoError(b, batcher.Shutdown(ctx))
|
||||
require.NoError(b, metrics.Shutdown(ctx))
|
||||
require.Equal(b, b.N*metricsPerRequest, sink.metricsCount)
|
||||
}
|
||||
|
||||
|
|
@ -892,7 +874,7 @@ func (sme *metricsSink) ConsumeMetrics(_ context.Context, md pmetric.Metrics) er
|
|||
func TestBatchLogProcessor_ReceivingData(t *testing.T) {
|
||||
// Instantiate the batch processor with low config values to test data
|
||||
// gets sent through the processor.
|
||||
cfg := Config{
|
||||
cfg := &Config{
|
||||
Timeout: 200 * time.Millisecond,
|
||||
SendBatchSize: 50,
|
||||
}
|
||||
|
|
@ -901,38 +883,36 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
|
|||
logsPerRequest := 5
|
||||
sink := new(consumertest.LogsSink)
|
||||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg)
|
||||
logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
sentResourceLogs := plog.NewLogs().ResourceLogs()
|
||||
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
ld := testdata.GenerateLogs(logsPerRequest)
|
||||
logs := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
|
||||
lrs := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
|
||||
for logIndex := 0; logIndex < logsPerRequest; logIndex++ {
|
||||
logs.At(logIndex).SetSeverityText(getTestLogSeverityText(requestNum, logIndex))
|
||||
lrs.At(logIndex).SetSeverityText(getTestLogSeverityText(requestNum, logIndex))
|
||||
}
|
||||
ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty())
|
||||
require.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
|
||||
require.NoError(t, logs.ConsumeLogs(context.Background(), ld))
|
||||
}
|
||||
|
||||
// Added to test case with empty resources sent.
|
||||
ld := plog.NewLogs()
|
||||
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
|
||||
assert.NoError(t, logs.ConsumeLogs(context.Background(), ld))
|
||||
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, logs.Shutdown(context.Background()))
|
||||
|
||||
require.Equal(t, requestCount*logsPerRequest, sink.LogRecordCount())
|
||||
receivedMds := sink.AllLogs()
|
||||
logsReceivedBySeverityText := logsReceivedBySeverityText(receivedMds)
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
logs := sentResourceLogs.At(requestNum).ScopeLogs().At(0).LogRecords()
|
||||
lrs := sentResourceLogs.At(requestNum).ScopeLogs().At(0).LogRecords()
|
||||
for logIndex := 0; logIndex < logsPerRequest; logIndex++ {
|
||||
require.EqualValues(t,
|
||||
logs.At(logIndex),
|
||||
lrs.At(logIndex),
|
||||
logsReceivedBySeverityText[getTestLogSeverityText(requestNum, logIndex)])
|
||||
}
|
||||
}
|
||||
|
|
@ -944,7 +924,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) {
|
|||
|
||||
// Instantiate the batch processor with low config values to test data
|
||||
// gets sent through the processor.
|
||||
cfg := Config{
|
||||
cfg := &Config{
|
||||
Timeout: 100 * time.Millisecond,
|
||||
SendBatchSize: 50,
|
||||
}
|
||||
|
|
@ -955,20 +935,18 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) {
|
|||
)
|
||||
sink := new(consumertest.LogsSink)
|
||||
|
||||
creationSet := tel.NewSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg)
|
||||
logs, err := NewFactory().CreateLogs(context.Background(), tel.NewSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
start := time.Now()
|
||||
size := 0
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
ld := testdata.GenerateLogs(logsPerRequest)
|
||||
size += sizer.LogsSize(ld)
|
||||
require.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
|
||||
require.NoError(t, logs.ConsumeLogs(context.Background(), ld))
|
||||
}
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, logs.Shutdown(context.Background()))
|
||||
|
||||
elapsed := time.Since(start)
|
||||
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())
|
||||
|
|
@ -1058,10 +1036,11 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) {
|
|||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, tel.Shutdown(context.Background()))
|
||||
}
|
||||
|
||||
func TestBatchLogsProcessor_Timeout(t *testing.T) {
|
||||
cfg := Config{
|
||||
cfg := &Config{
|
||||
Timeout: 100 * time.Millisecond,
|
||||
SendBatchSize: 100,
|
||||
}
|
||||
|
|
@ -1069,16 +1048,14 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {
|
|||
logsPerRequest := 10
|
||||
sink := new(consumertest.LogsSink)
|
||||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg)
|
||||
logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
start := time.Now()
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
ld := testdata.GenerateLogs(logsPerRequest)
|
||||
require.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
|
||||
require.NoError(t, logs.ConsumeLogs(context.Background(), ld))
|
||||
}
|
||||
|
||||
// Wait for at least one batch to be sent.
|
||||
|
|
@ -1093,7 +1070,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {
|
|||
require.LessOrEqual(t, cfg.Timeout.Nanoseconds(), elapsed.Nanoseconds())
|
||||
|
||||
// This should not change the results in the sink, verified by the expectedBatchesNum
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, logs.Shutdown(context.Background()))
|
||||
|
||||
expectedBatchesNum := 1
|
||||
expectedBatchingFactor := 5
|
||||
|
|
@ -1110,7 +1087,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestBatchLogProcessor_Shutdown(t *testing.T) {
|
||||
cfg := Config{
|
||||
cfg := &Config{
|
||||
Timeout: 3 * time.Second,
|
||||
SendBatchSize: 1000,
|
||||
}
|
||||
|
|
@ -1118,18 +1095,16 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) {
|
|||
logsPerRequest := 10
|
||||
sink := new(consumertest.LogsSink)
|
||||
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg)
|
||||
logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
ld := testdata.GenerateLogs(logsPerRequest)
|
||||
require.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
|
||||
require.NoError(t, logs.ConsumeLogs(context.Background(), ld))
|
||||
}
|
||||
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, logs.Shutdown(context.Background()))
|
||||
|
||||
require.Equal(t, requestCount*logsPerRequest, sink.LogRecordCount())
|
||||
require.Len(t, sink.AllLogs(), 1)
|
||||
|
|
@ -1197,11 +1172,9 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) {
|
|||
cfg.SendBatchSize = 1000
|
||||
cfg.Timeout = 10 * time.Minute
|
||||
cfg.MetadataKeys = []string{"token1", "token2"}
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
bg := context.Background()
|
||||
callCtxs := []context.Context{
|
||||
|
|
@ -1248,10 +1221,10 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) {
|
|||
// use round-robin to assign context.
|
||||
num := requestNum % len(callCtxs)
|
||||
expectByContext[num] += spansPerRequest
|
||||
require.NoError(t, batcher.ConsumeTraces(callCtxs[num], td))
|
||||
require.NoError(t, traces.ConsumeTraces(callCtxs[num], td))
|
||||
}
|
||||
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, traces.Shutdown(context.Background()))
|
||||
|
||||
// The following tests are the same as TestBatchProcessorSpansDelivered().
|
||||
require.Equal(t, requestCount*spansPerRequest, sink.SpanCount())
|
||||
|
|
@ -1290,10 +1263,9 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) {
|
|||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.MetadataKeys = []string{"token"}
|
||||
cfg.MetadataCardinalityLimit = cardLimit
|
||||
creationSet := processortest.NewNopSettings()
|
||||
batcher, err := newTracesBatchProcessor(creationSet, sink, cfg)
|
||||
traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
bg := context.Background()
|
||||
for requestNum := 0; requestNum < cardLimit; requestNum++ {
|
||||
|
|
@ -1304,7 +1276,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) {
|
|||
}),
|
||||
})
|
||||
|
||||
require.NoError(t, batcher.ConsumeTraces(ctx, td))
|
||||
require.NoError(t, traces.ConsumeTraces(ctx, td))
|
||||
}
|
||||
|
||||
td := testdata.GenerateTraces(1)
|
||||
|
|
@ -1313,38 +1285,36 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) {
|
|||
"token": {"limit_exceeded"},
|
||||
}),
|
||||
})
|
||||
err = batcher.ConsumeTraces(ctx, td)
|
||||
err = traces.ConsumeTraces(ctx, td)
|
||||
|
||||
require.Error(t, err)
|
||||
assert.True(t, consumererror.IsPermanent(err))
|
||||
require.ErrorContains(t, err, "too many")
|
||||
|
||||
require.NoError(t, batcher.Shutdown(context.Background()))
|
||||
require.NoError(t, traces.Shutdown(context.Background()))
|
||||
}
|
||||
|
||||
func TestBatchZeroConfig(t *testing.T) {
|
||||
// This is a no-op configuration. No need for a timer, no
|
||||
// minimum, no maximum, just a pass through.
|
||||
cfg := Config{}
|
||||
cfg := &Config{}
|
||||
|
||||
require.NoError(t, cfg.Validate())
|
||||
|
||||
const requestCount = 5
|
||||
const logsPerRequest = 10
|
||||
sink := new(consumertest.LogsSink)
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg)
|
||||
logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
defer func() { require.NoError(t, batcher.Shutdown(context.Background())) }()
|
||||
require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost()))
|
||||
defer func() { require.NoError(t, logs.Shutdown(context.Background())) }()
|
||||
|
||||
expect := 0
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
cnt := logsPerRequest + requestNum
|
||||
expect += cnt
|
||||
ld := testdata.GenerateLogs(cnt)
|
||||
require.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
|
||||
require.NoError(t, logs.ConsumeLogs(context.Background(), ld))
|
||||
}
|
||||
|
||||
// Wait for all batches.
|
||||
|
|
@ -1366,23 +1336,21 @@ func TestBatchSplitOnly(t *testing.T) {
|
|||
const requestCount = 5
|
||||
const logsPerRequest = 100
|
||||
|
||||
cfg := Config{
|
||||
cfg := &Config{
|
||||
SendBatchMaxSize: maxBatch,
|
||||
}
|
||||
|
||||
require.NoError(t, cfg.Validate())
|
||||
|
||||
sink := new(consumertest.LogsSink)
|
||||
creationSet := processortest.NewNopSettings()
|
||||
creationSet.MetricsLevel = configtelemetry.LevelDetailed
|
||||
batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg)
|
||||
logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, sink)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
|
||||
defer func() { require.NoError(t, batcher.Shutdown(context.Background())) }()
|
||||
require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost()))
|
||||
defer func() { require.NoError(t, logs.Shutdown(context.Background())) }()
|
||||
|
||||
for requestNum := 0; requestNum < requestCount; requestNum++ {
|
||||
ld := testdata.GenerateLogs(logsPerRequest)
|
||||
require.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
|
||||
require.NoError(t, logs.ConsumeLogs(context.Background(), ld))
|
||||
}
|
||||
|
||||
// Wait for all batches.
|
||||
|
|
|
|||
|
|
@ -26,14 +26,15 @@ type componentTestTelemetry struct {
|
|||
|
||||
func (tt *componentTestTelemetry) NewSettings() processor.Settings {
|
||||
set := processortest.NewNopSettings()
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
set.ID = component.NewID(component.MustNewType("batch"))
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
return set
|
||||
}
|
||||
|
||||
func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings {
|
||||
set := componenttest.NewNopTelemetrySettings()
|
||||
set.MeterProvider = tt.meterProvider
|
||||
set.MetricsLevel = configtelemetry.LevelDetailed
|
||||
set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return tt.meterProvider
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,17 +7,18 @@ import (
|
|||
"errors"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
)
|
||||
|
||||
// Deprecated: [v0.108.0] use LeveledMeter instead.
|
||||
func Meter(settings component.TelemetrySettings) metric.Meter {
|
||||
return settings.MeterProvider.Meter("go.opentelemetry.io/collector/processor/batchprocessor")
|
||||
}
|
||||
|
||||
// Deprecated: [v0.114.0] use Meter instead.
|
||||
func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter {
|
||||
return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/processor/batchprocessor")
|
||||
}
|
||||
|
|
@ -36,7 +37,6 @@ type TelemetryBuilder struct {
|
|||
ProcessorBatchMetadataCardinality metric.Int64ObservableUpDownCounter
|
||||
observeProcessorBatchMetadataCardinality func(context.Context, metric.Observer) error
|
||||
ProcessorBatchTimeoutTriggerSend metric.Int64Counter
|
||||
meters map[configtelemetry.Level]metric.Meter
|
||||
}
|
||||
|
||||
// TelemetryBuilderOption applies changes to default builder.
|
||||
|
|
@ -63,42 +63,41 @@ func WithProcessorBatchMetadataCardinalityCallback(cb func() int64, opts ...metr
|
|||
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
|
||||
// for a component
|
||||
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {
|
||||
builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}}
|
||||
builder := TelemetryBuilder{}
|
||||
for _, op := range options {
|
||||
op.apply(&builder)
|
||||
}
|
||||
builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic)
|
||||
builder.meters[configtelemetry.LevelDetailed] = LeveledMeter(settings, configtelemetry.LevelDetailed)
|
||||
builder.meter = Meter(settings)
|
||||
var err, errs error
|
||||
builder.ProcessorBatchBatchSendSize, err = builder.meters[configtelemetry.LevelBasic].Int64Histogram(
|
||||
builder.ProcessorBatchBatchSendSize, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Histogram(
|
||||
"otelcol_processor_batch_batch_send_size",
|
||||
metric.WithDescription("Number of units in the batch"),
|
||||
metric.WithUnit("{units}"),
|
||||
metric.WithExplicitBucketBoundaries([]float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}...),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessorBatchBatchSendSizeBytes, err = builder.meters[configtelemetry.LevelDetailed].Int64Histogram(
|
||||
builder.ProcessorBatchBatchSendSizeBytes, err = getLeveledMeter(builder.meter, configtelemetry.LevelDetailed, settings.MetricsLevel).Int64Histogram(
|
||||
"otelcol_processor_batch_batch_send_size_bytes",
|
||||
metric.WithDescription("Number of bytes in batch that was sent"),
|
||||
metric.WithUnit("By"),
|
||||
metric.WithExplicitBucketBoundaries([]float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, 1e+06, 2e+06, 3e+06, 4e+06, 5e+06, 6e+06, 7e+06, 8e+06, 9e+06}...),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessorBatchBatchSizeTriggerSend, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ProcessorBatchBatchSizeTriggerSend, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_processor_batch_batch_size_trigger_send",
|
||||
metric.WithDescription("Number of times the batch was sent due to a size trigger"),
|
||||
metric.WithUnit("{times}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessorBatchMetadataCardinality, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableUpDownCounter(
|
||||
builder.ProcessorBatchMetadataCardinality, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64ObservableUpDownCounter(
|
||||
"otelcol_processor_batch_metadata_cardinality",
|
||||
metric.WithDescription("Number of distinct metadata value combinations being processed"),
|
||||
metric.WithUnit("{combinations}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
_, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessorBatchMetadataCardinality, builder.ProcessorBatchMetadataCardinality)
|
||||
_, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessorBatchMetadataCardinality, builder.ProcessorBatchMetadataCardinality)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessorBatchTimeoutTriggerSend, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ProcessorBatchTimeoutTriggerSend, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_processor_batch_timeout_trigger_send",
|
||||
metric.WithDescription("Number of times the batch was sent due to a timeout trigger"),
|
||||
metric.WithUnit("{times}"),
|
||||
|
|
@ -106,3 +105,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
|
|||
errs = errors.Join(errs, err)
|
||||
return &builder, errs
|
||||
}
|
||||
|
||||
func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter {
|
||||
if cfgLevel <= srvLevel {
|
||||
return meter
|
||||
}
|
||||
return noop.Meter{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,8 +4,8 @@ github_project: open-telemetry/opentelemetry-collector
|
|||
status:
|
||||
class: processor
|
||||
stability:
|
||||
beta: [traces, metrics, logs]
|
||||
distributions: [core, contrib, k8s]
|
||||
beta: [ traces, metrics, logs ]
|
||||
distributions: [ core, contrib, k8s ]
|
||||
|
||||
tests:
|
||||
|
||||
|
|
@ -32,7 +32,7 @@ telemetry:
|
|||
unit: "{units}"
|
||||
histogram:
|
||||
value_type: int
|
||||
bucket_boundaries: [10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000]
|
||||
bucket_boundaries: [ 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000 ]
|
||||
processor_batch_batch_send_size_bytes:
|
||||
level: detailed
|
||||
enabled: true
|
||||
|
|
@ -40,7 +40,7 @@ telemetry:
|
|||
unit: By
|
||||
histogram:
|
||||
value_type: int
|
||||
bucket_boundaries: [10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000]
|
||||
bucket_boundaries: [ 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000 ]
|
||||
processor_batch_metadata_cardinality:
|
||||
enabled: true
|
||||
description: Number of distinct metadata value combinations being processed
|
||||
|
|
|
|||
|
|
@ -26,14 +26,15 @@ type componentTestTelemetry struct {
|
|||
|
||||
func (tt *componentTestTelemetry) NewSettings() processor.Settings {
|
||||
set := processortest.NewNopSettings()
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
set.ID = component.NewID(component.MustNewType("memory_limiter"))
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
return set
|
||||
}
|
||||
|
||||
func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings {
|
||||
set := componenttest.NewNopTelemetrySettings()
|
||||
set.MeterProvider = tt.meterProvider
|
||||
set.MetricsLevel = configtelemetry.LevelDetailed
|
||||
set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return tt.meterProvider
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,17 +6,18 @@ import (
|
|||
"errors"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
)
|
||||
|
||||
// Deprecated: [v0.108.0] use LeveledMeter instead.
|
||||
func Meter(settings component.TelemetrySettings) metric.Meter {
|
||||
return settings.MeterProvider.Meter("go.opentelemetry.io/collector/processor/memorylimiterprocessor")
|
||||
}
|
||||
|
||||
// Deprecated: [v0.114.0] use Meter instead.
|
||||
func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter {
|
||||
return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/processor/memorylimiterprocessor")
|
||||
}
|
||||
|
|
@ -35,7 +36,6 @@ type TelemetryBuilder struct {
|
|||
ProcessorRefusedLogRecords metric.Int64Counter
|
||||
ProcessorRefusedMetricPoints metric.Int64Counter
|
||||
ProcessorRefusedSpans metric.Int64Counter
|
||||
meters map[configtelemetry.Level]metric.Meter
|
||||
}
|
||||
|
||||
// TelemetryBuilderOption applies changes to default builder.
|
||||
|
|
@ -52,43 +52,43 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
|
|||
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
|
||||
// for a component
|
||||
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {
|
||||
builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}}
|
||||
builder := TelemetryBuilder{}
|
||||
for _, op := range options {
|
||||
op.apply(&builder)
|
||||
}
|
||||
builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic)
|
||||
builder.meter = Meter(settings)
|
||||
var err, errs error
|
||||
builder.ProcessorAcceptedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ProcessorAcceptedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_processor_accepted_log_records",
|
||||
metric.WithDescription("Number of log records successfully pushed into the next component in the pipeline. [deprecated since v0.110.0]"),
|
||||
metric.WithUnit("{records}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessorAcceptedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ProcessorAcceptedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_processor_accepted_metric_points",
|
||||
metric.WithDescription("Number of metric points successfully pushed into the next component in the pipeline. [deprecated since v0.110.0]"),
|
||||
metric.WithUnit("{datapoints}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessorAcceptedSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ProcessorAcceptedSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_processor_accepted_spans",
|
||||
metric.WithDescription("Number of spans successfully pushed into the next component in the pipeline. [deprecated since v0.110.0]"),
|
||||
metric.WithUnit("{spans}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessorRefusedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ProcessorRefusedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_processor_refused_log_records",
|
||||
metric.WithDescription("Number of log records that were rejected by the next component in the pipeline. [deprecated since v0.110.0]"),
|
||||
metric.WithUnit("{records}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessorRefusedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ProcessorRefusedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_processor_refused_metric_points",
|
||||
metric.WithDescription("Number of metric points that were rejected by the next component in the pipeline. [deprecated since v0.110.0]"),
|
||||
metric.WithUnit("{datapoints}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessorRefusedSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ProcessorRefusedSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_processor_refused_spans",
|
||||
metric.WithDescription("Number of spans that were rejected by the next component in the pipeline. [deprecated since v0.110.0]"),
|
||||
metric.WithUnit("{spans}"),
|
||||
|
|
@ -96,3 +96,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
|
|||
errs = errors.Join(errs, err)
|
||||
return &builder, errs
|
||||
}
|
||||
|
||||
func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter {
|
||||
if cfgLevel <= srvLevel {
|
||||
return meter
|
||||
}
|
||||
return noop.Meter{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,8 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/component/componenttest"
|
||||
|
|
@ -205,6 +207,74 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestMetricsTelemetry(t *testing.T) {
|
||||
tel := setupTestTelemetry()
|
||||
cfg := &Config{
|
||||
CheckInterval: time.Second,
|
||||
MemoryLimitPercentage: 50,
|
||||
MemorySpikePercentage: 10,
|
||||
}
|
||||
metrics, err := NewFactory().CreateMetrics(context.Background(), tel.NewSettings(), cfg, consumertest.NewNop())
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost()))
|
||||
|
||||
md := pmetric.NewMetrics()
|
||||
md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
|
||||
for requestNum := 0; requestNum < 10; requestNum++ {
|
||||
require.NoError(t, metrics.ConsumeMetrics(context.Background(), md))
|
||||
}
|
||||
require.NoError(t, metrics.Shutdown(context.Background()))
|
||||
|
||||
tel.assertMetrics(t, []metricdata.Metrics{
|
||||
{
|
||||
Name: "otelcol_processor_accepted_metric_points",
|
||||
Description: "Number of metric points successfully pushed into the next component in the pipeline. [deprecated since v0.110.0]",
|
||||
Unit: "{datapoints}",
|
||||
Data: metricdata.Sum[int64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Value: 10,
|
||||
Attributes: attribute.NewSet(attribute.String("processor", "memory_limiter")),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "otelcol_processor_incoming_items",
|
||||
Description: "Number of items passed to the processor. [alpha]",
|
||||
Unit: "{items}",
|
||||
Data: metricdata.Sum[int64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Value: 10,
|
||||
Attributes: attribute.NewSet(attribute.String("processor", "memory_limiter"), attribute.String("otel.signal", "metrics")),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "otelcol_processor_outgoing_items",
|
||||
Description: "Number of items emitted from the processor. [alpha]",
|
||||
Unit: "{items}",
|
||||
Data: metricdata.Sum[int64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Value: 10,
|
||||
Attributes: attribute.NewSet(attribute.String("processor", "memory_limiter"), attribute.String("otel.signal", "metrics")),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, tel.Shutdown(context.Background()))
|
||||
}
|
||||
|
||||
// TestTraceMemoryPressureResponse manipulates results from querying memory and
|
||||
// check expected side effects.
|
||||
func TestTraceMemoryPressureResponse(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -26,14 +26,15 @@ type componentTestTelemetry struct {
|
|||
|
||||
func (tt *componentTestTelemetry) NewSettings() processor.Settings {
|
||||
set := processortest.NewNopSettings()
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
set.ID = component.NewID(component.MustNewType("processorhelper"))
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
return set
|
||||
}
|
||||
|
||||
func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings {
|
||||
set := componenttest.NewNopTelemetrySettings()
|
||||
set.MeterProvider = tt.meterProvider
|
||||
set.MetricsLevel = configtelemetry.LevelDetailed
|
||||
set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return tt.meterProvider
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,17 +6,18 @@ import (
|
|||
"errors"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
)
|
||||
|
||||
// Deprecated: [v0.108.0] use LeveledMeter instead.
|
||||
func Meter(settings component.TelemetrySettings) metric.Meter {
|
||||
return settings.MeterProvider.Meter("go.opentelemetry.io/collector/processor/processorhelper")
|
||||
}
|
||||
|
||||
// Deprecated: [v0.114.0] use Meter instead.
|
||||
func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter {
|
||||
return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/processor/processorhelper")
|
||||
}
|
||||
|
|
@ -31,7 +32,6 @@ type TelemetryBuilder struct {
|
|||
meter metric.Meter
|
||||
ProcessorIncomingItems metric.Int64Counter
|
||||
ProcessorOutgoingItems metric.Int64Counter
|
||||
meters map[configtelemetry.Level]metric.Meter
|
||||
}
|
||||
|
||||
// TelemetryBuilderOption applies changes to default builder.
|
||||
|
|
@ -48,19 +48,19 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
|
|||
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
|
||||
// for a component
|
||||
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {
|
||||
builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}}
|
||||
builder := TelemetryBuilder{}
|
||||
for _, op := range options {
|
||||
op.apply(&builder)
|
||||
}
|
||||
builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic)
|
||||
builder.meter = Meter(settings)
|
||||
var err, errs error
|
||||
builder.ProcessorIncomingItems, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ProcessorIncomingItems, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_processor_incoming_items",
|
||||
metric.WithDescription("Number of items passed to the processor. [alpha]"),
|
||||
metric.WithUnit("{items}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessorOutgoingItems, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ProcessorOutgoingItems, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_processor_outgoing_items",
|
||||
metric.WithDescription("Number of items emitted from the processor. [alpha]"),
|
||||
metric.WithUnit("{items}"),
|
||||
|
|
@ -68,3 +68,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
|
|||
errs = errors.Join(errs, err)
|
||||
return &builder, errs
|
||||
}
|
||||
|
||||
func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter {
|
||||
if cfgLevel <= srvLevel {
|
||||
return meter
|
||||
}
|
||||
return noop.Meter{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,14 +26,15 @@ type componentTestTelemetry struct {
|
|||
|
||||
func (tt *componentTestTelemetry) NewSettings() receiver.Settings {
|
||||
set := receivertest.NewNopSettings()
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
set.ID = component.NewID(component.MustNewType("receiverhelper"))
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
return set
|
||||
}
|
||||
|
||||
func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings {
|
||||
set := componenttest.NewNopTelemetrySettings()
|
||||
set.MeterProvider = tt.meterProvider
|
||||
set.MetricsLevel = configtelemetry.LevelDetailed
|
||||
set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return tt.meterProvider
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,17 +6,18 @@ import (
|
|||
"errors"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
)
|
||||
|
||||
// Deprecated: [v0.108.0] use LeveledMeter instead.
|
||||
func Meter(settings component.TelemetrySettings) metric.Meter {
|
||||
return settings.MeterProvider.Meter("go.opentelemetry.io/collector/receiver/receiverhelper")
|
||||
}
|
||||
|
||||
// Deprecated: [v0.114.0] use Meter instead.
|
||||
func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter {
|
||||
return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/receiver/receiverhelper")
|
||||
}
|
||||
|
|
@ -35,7 +36,6 @@ type TelemetryBuilder struct {
|
|||
ReceiverRefusedLogRecords metric.Int64Counter
|
||||
ReceiverRefusedMetricPoints metric.Int64Counter
|
||||
ReceiverRefusedSpans metric.Int64Counter
|
||||
meters map[configtelemetry.Level]metric.Meter
|
||||
}
|
||||
|
||||
// TelemetryBuilderOption applies changes to default builder.
|
||||
|
|
@ -52,43 +52,43 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
|
|||
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
|
||||
// for a component
|
||||
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {
|
||||
builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}}
|
||||
builder := TelemetryBuilder{}
|
||||
for _, op := range options {
|
||||
op.apply(&builder)
|
||||
}
|
||||
builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic)
|
||||
builder.meter = Meter(settings)
|
||||
var err, errs error
|
||||
builder.ReceiverAcceptedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ReceiverAcceptedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_receiver_accepted_log_records",
|
||||
metric.WithDescription("Number of log records successfully pushed into the pipeline."),
|
||||
metric.WithUnit("{records}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ReceiverAcceptedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ReceiverAcceptedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_receiver_accepted_metric_points",
|
||||
metric.WithDescription("Number of metric points successfully pushed into the pipeline."),
|
||||
metric.WithUnit("{datapoints}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ReceiverAcceptedSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ReceiverAcceptedSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_receiver_accepted_spans",
|
||||
metric.WithDescription("Number of spans successfully pushed into the pipeline."),
|
||||
metric.WithUnit("{spans}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ReceiverRefusedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ReceiverRefusedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_receiver_refused_log_records",
|
||||
metric.WithDescription("Number of log records that could not be pushed into the pipeline."),
|
||||
metric.WithUnit("{records}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ReceiverRefusedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ReceiverRefusedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_receiver_refused_metric_points",
|
||||
metric.WithDescription("Number of metric points that could not be pushed into the pipeline."),
|
||||
metric.WithUnit("{datapoints}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ReceiverRefusedSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ReceiverRefusedSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_receiver_refused_spans",
|
||||
metric.WithDescription("Number of spans that could not be pushed into the pipeline."),
|
||||
metric.WithUnit("{spans}"),
|
||||
|
|
@ -96,3 +96,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
|
|||
errs = errors.Join(errs, err)
|
||||
return &builder, errs
|
||||
}
|
||||
|
||||
func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter {
|
||||
if cfgLevel <= srvLevel {
|
||||
return meter
|
||||
}
|
||||
return noop.Meter{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,14 +26,15 @@ type componentTestTelemetry struct {
|
|||
|
||||
func (tt *componentTestTelemetry) NewSettings() receiver.Settings {
|
||||
set := receivertest.NewNopSettings()
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
set.ID = component.NewID(component.MustNewType("scraperhelper"))
|
||||
set.TelemetrySettings = tt.newTelemetrySettings()
|
||||
return set
|
||||
}
|
||||
|
||||
func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings {
|
||||
set := componenttest.NewNopTelemetrySettings()
|
||||
set.MeterProvider = tt.meterProvider
|
||||
set.MetricsLevel = configtelemetry.LevelDetailed
|
||||
set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return tt.meterProvider
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,17 +6,18 @@ import (
|
|||
"errors"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
)
|
||||
|
||||
// Deprecated: [v0.108.0] use LeveledMeter instead.
|
||||
func Meter(settings component.TelemetrySettings) metric.Meter {
|
||||
return settings.MeterProvider.Meter("go.opentelemetry.io/collector/receiver/scraperhelper")
|
||||
}
|
||||
|
||||
// Deprecated: [v0.114.0] use Meter instead.
|
||||
func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter {
|
||||
return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/receiver/scraperhelper")
|
||||
}
|
||||
|
|
@ -31,7 +32,6 @@ type TelemetryBuilder struct {
|
|||
meter metric.Meter
|
||||
ScraperErroredMetricPoints metric.Int64Counter
|
||||
ScraperScrapedMetricPoints metric.Int64Counter
|
||||
meters map[configtelemetry.Level]metric.Meter
|
||||
}
|
||||
|
||||
// TelemetryBuilderOption applies changes to default builder.
|
||||
|
|
@ -48,19 +48,19 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
|
|||
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
|
||||
// for a component
|
||||
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {
|
||||
builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}}
|
||||
builder := TelemetryBuilder{}
|
||||
for _, op := range options {
|
||||
op.apply(&builder)
|
||||
}
|
||||
builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic)
|
||||
builder.meter = Meter(settings)
|
||||
var err, errs error
|
||||
builder.ScraperErroredMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ScraperErroredMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_scraper_errored_metric_points",
|
||||
metric.WithDescription("Number of metric points that were unable to be scraped."),
|
||||
metric.WithUnit("{datapoints}"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ScraperScrapedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
|
||||
builder.ScraperScrapedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter(
|
||||
"otelcol_scraper_scraped_metric_points",
|
||||
metric.WithDescription("Number of metric points successfully scraped."),
|
||||
metric.WithUnit("{datapoints}"),
|
||||
|
|
@ -68,3 +68,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
|
|||
errs = errors.Join(errs, err)
|
||||
return &builder, errs
|
||||
}
|
||||
|
||||
func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter {
|
||||
if cfgLevel <= srvLevel {
|
||||
return meter
|
||||
}
|
||||
return noop.Meter{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ type componentTestTelemetry struct {
|
|||
func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings {
|
||||
set := componenttest.NewNopTelemetrySettings()
|
||||
set.MeterProvider = tt.meterProvider
|
||||
set.MetricsLevel = configtelemetry.LevelDetailed
|
||||
set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return tt.meterProvider
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,17 +7,18 @@ import (
|
|||
"errors"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
)
|
||||
|
||||
// Deprecated: [v0.108.0] use LeveledMeter instead.
|
||||
func Meter(settings component.TelemetrySettings) metric.Meter {
|
||||
return settings.MeterProvider.Meter("go.opentelemetry.io/collector/service")
|
||||
}
|
||||
|
||||
// Deprecated: [v0.114.0] use Meter instead.
|
||||
func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter {
|
||||
return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/service")
|
||||
}
|
||||
|
|
@ -42,7 +43,6 @@ type TelemetryBuilder struct {
|
|||
observeProcessRuntimeTotalSysMemoryBytes func(context.Context, metric.Observer) error
|
||||
ProcessUptime metric.Float64ObservableCounter
|
||||
observeProcessUptime func(context.Context, metric.Observer) error
|
||||
meters map[configtelemetry.Level]metric.Meter
|
||||
}
|
||||
|
||||
// TelemetryBuilderOption applies changes to default builder.
|
||||
|
|
@ -119,59 +119,66 @@ func WithProcessUptimeCallback(cb func() float64, opts ...metric.ObserveOption)
|
|||
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
|
||||
// for a component
|
||||
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {
|
||||
builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}}
|
||||
builder := TelemetryBuilder{}
|
||||
for _, op := range options {
|
||||
op.apply(&builder)
|
||||
}
|
||||
builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic)
|
||||
builder.meter = Meter(settings)
|
||||
var err, errs error
|
||||
builder.ProcessCPUSeconds, err = builder.meters[configtelemetry.LevelBasic].Float64ObservableCounter(
|
||||
builder.ProcessCPUSeconds, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Float64ObservableCounter(
|
||||
"otelcol_process_cpu_seconds",
|
||||
metric.WithDescription("Total CPU user and system time in seconds"),
|
||||
metric.WithUnit("s"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
_, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessCPUSeconds, builder.ProcessCPUSeconds)
|
||||
_, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessCPUSeconds, builder.ProcessCPUSeconds)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessMemoryRss, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableGauge(
|
||||
builder.ProcessMemoryRss, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64ObservableGauge(
|
||||
"otelcol_process_memory_rss",
|
||||
metric.WithDescription("Total physical memory (resident set size)"),
|
||||
metric.WithUnit("By"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
_, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessMemoryRss, builder.ProcessMemoryRss)
|
||||
_, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessMemoryRss, builder.ProcessMemoryRss)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessRuntimeHeapAllocBytes, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableGauge(
|
||||
builder.ProcessRuntimeHeapAllocBytes, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64ObservableGauge(
|
||||
"otelcol_process_runtime_heap_alloc_bytes",
|
||||
metric.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"),
|
||||
metric.WithUnit("By"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
_, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessRuntimeHeapAllocBytes, builder.ProcessRuntimeHeapAllocBytes)
|
||||
_, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessRuntimeHeapAllocBytes, builder.ProcessRuntimeHeapAllocBytes)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessRuntimeTotalAllocBytes, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableCounter(
|
||||
builder.ProcessRuntimeTotalAllocBytes, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64ObservableCounter(
|
||||
"otelcol_process_runtime_total_alloc_bytes",
|
||||
metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"),
|
||||
metric.WithUnit("By"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
_, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessRuntimeTotalAllocBytes, builder.ProcessRuntimeTotalAllocBytes)
|
||||
_, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessRuntimeTotalAllocBytes, builder.ProcessRuntimeTotalAllocBytes)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessRuntimeTotalSysMemoryBytes, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableGauge(
|
||||
builder.ProcessRuntimeTotalSysMemoryBytes, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64ObservableGauge(
|
||||
"otelcol_process_runtime_total_sys_memory_bytes",
|
||||
metric.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"),
|
||||
metric.WithUnit("By"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
_, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessRuntimeTotalSysMemoryBytes, builder.ProcessRuntimeTotalSysMemoryBytes)
|
||||
_, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessRuntimeTotalSysMemoryBytes, builder.ProcessRuntimeTotalSysMemoryBytes)
|
||||
errs = errors.Join(errs, err)
|
||||
builder.ProcessUptime, err = builder.meters[configtelemetry.LevelBasic].Float64ObservableCounter(
|
||||
builder.ProcessUptime, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Float64ObservableCounter(
|
||||
"otelcol_process_uptime",
|
||||
metric.WithDescription("Uptime of the process"),
|
||||
metric.WithUnit("s"),
|
||||
)
|
||||
errs = errors.Join(errs, err)
|
||||
_, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessUptime, builder.ProcessUptime)
|
||||
_, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessUptime, builder.ProcessUptime)
|
||||
errs = errors.Join(errs, err)
|
||||
return &builder, errs
|
||||
}
|
||||
|
||||
func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter {
|
||||
if cfgLevel <= srvLevel {
|
||||
return meter
|
||||
}
|
||||
return noop.Meter{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import (
|
|||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
io_prometheus_client "github.com/prometheus/client_model/go"
|
||||
promclient "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
|
@ -27,8 +27,8 @@ import (
|
|||
)
|
||||
|
||||
type testTelemetry struct {
|
||||
component.TelemetrySettings
|
||||
promHandler http.Handler
|
||||
TelemetrySettings component.TelemetrySettings
|
||||
promHandler http.Handler
|
||||
}
|
||||
|
||||
var expectedMetrics = []string{
|
||||
|
|
@ -54,10 +54,8 @@ func setupTelemetry(t *testing.T) testTelemetry {
|
|||
sdkmetric.WithReader(exporter),
|
||||
)
|
||||
|
||||
settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return meterProvider
|
||||
}
|
||||
|
||||
settings.TelemetrySettings.MetricsLevel = configtelemetry.LevelDetailed
|
||||
settings.TelemetrySettings.MeterProvider = meterProvider
|
||||
settings.TelemetrySettings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
|
||||
return meterProvider
|
||||
}
|
||||
|
|
@ -69,7 +67,7 @@ func setupTelemetry(t *testing.T) testTelemetry {
|
|||
return settings
|
||||
}
|
||||
|
||||
func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_client.MetricFamily, error) {
|
||||
func fetchPrometheusMetrics(handler http.Handler) (map[string]*promclient.MetricFamily, error) {
|
||||
req, err := http.NewRequest(http.MethodGet, "/metrics", nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -95,7 +93,7 @@ func TestProcessTelemetry(t *testing.T) {
|
|||
require.True(t, ok)
|
||||
require.Len(t, metric.Metric, 1)
|
||||
var metricValue float64
|
||||
if metric.GetType() == io_prometheus_client.MetricType_COUNTER {
|
||||
if metric.GetType() == promclient.MetricType_COUNTER {
|
||||
metricValue = metric.Metric[0].GetCounter().GetValue()
|
||||
} else {
|
||||
metricValue = metric.Metric[0].GetGauge().GetValue()
|
||||
|
|
|
|||
Loading…
Reference in New Issue