Make registration of callback for async metric always optional (#12204)

This PR also solves one more issues with the new interface, which is the
ability to record multiple observations for a single async metric using
one callback.

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2025-01-29 15:58:43 -08:00 committed by GitHub
parent 74bff40691
commit 3203167e9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 924 additions and 412 deletions

View File

@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: mdatagen
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: All register callbacks to async instruments can now be unregistered by calling `metadata.TelemetryBuilder.Shutdown()`
# One or more tracking issues or pull requests related to the change
issues: [12204]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]

View File

@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: mdatagen
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make registration of callback for async metric always optional.
# One or more tracking issues or pull requests related to the change
issues: [12204]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Deprecate `metadata.TelemetryBuilder.Init*` and `metadata.With*Callback` in favor of `metadata.TelemetryBuilder.Register*Callback`
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]

View File

@ -601,10 +601,10 @@ package metadata
import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
)
func Meter(settings component.TelemetrySettings) metric.Meter {
@ -634,10 +634,10 @@ package metadata
import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
)
func Meter(settings component.TelemetrySettings) metric.Meter {

View File

@ -6,6 +6,8 @@ package samplereceiver // import "go.opentelemetry.io/collector/cmd/mdatagen/int
import (
"context"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/collector/cmd/mdatagen/internal/samplereceiver/internal/metadata"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
@ -27,10 +29,18 @@ func createTraces(context.Context, receiver.Settings, component.Config, consumer
}
func createMetrics(ctx context.Context, set receiver.Settings, _ component.Config, _ consumer.Metrics) (receiver.Metrics, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings, metadata.WithProcessRuntimeTotalAllocBytesCallback(func() int64 { return 2 }))
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
if err != nil {
return nil, err
}
err = telemetryBuilder.RegisterProcessRuntimeTotalAllocBytesCallback(func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(2)
return nil
})
if err != nil {
return nil, err
}
telemetryBuilder.BatchSizeTriggerSend.Add(ctx, 1)
return nopReceiver{telemetryBuilder: telemetryBuilder}, nil
}
@ -43,10 +53,20 @@ var nopInstance = &nopReceiver{}
type nopReceiver struct {
component.StartFunc
component.ShutdownFunc
telemetryBuilder *metadata.TelemetryBuilder
}
func (r nopReceiver) initOptionalMetric() {
_, _ = r.telemetryBuilder.InitQueueLength(func() int64 { return 1 })
_ = r.telemetryBuilder.RegisterQueueLengthCallback(func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(3)
return nil
})
}
// Shutdown shuts down the component.
func (r nopReceiver) Shutdown(context.Context) error {
if r.telemetryBuilder != nil {
r.telemetryBuilder.Shutdown()
}
return nil
}

View File

@ -5,8 +5,10 @@ package metadata
import (
"context"
"errors"
"sync"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/collector/component"
@ -23,9 +25,12 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer {
// TelemetryBuilder provides an interface for components to report telemetry
// as defined in metadata and user config.
type TelemetryBuilder struct {
meter metric.Meter
BatchSizeTriggerSend metric.Int64Counter
ProcessRuntimeTotalAllocBytes metric.Int64ObservableCounter
meter metric.Meter
mu sync.Mutex
registrations []metric.Registration
BatchSizeTriggerSend metric.Int64Counter
ProcessRuntimeTotalAllocBytes metric.Int64ObservableCounter
// TODO: Remove in v0.119.0 when remove deprecated funcs.
observeProcessRuntimeTotalAllocBytes func(context.Context, metric.Observer) error
QueueCapacity metric.Int64Gauge
QueueLength metric.Int64ObservableGauge
@ -43,7 +48,7 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
tbof(mb)
}
// WithProcessRuntimeTotalAllocBytesCallback sets callback for observable ProcessRuntimeTotalAllocBytes metric.
// Deprecated: [v0.119.0] use RegisterProcessRuntimeTotalAllocBytesCallback.
func WithProcessRuntimeTotalAllocBytesCallback(cb func() int64, opts ...metric.ObserveOption) TelemetryBuilderOption {
return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) {
builder.observeProcessRuntimeTotalAllocBytes = func(_ context.Context, o metric.Observer) error {
@ -53,17 +58,23 @@ func WithProcessRuntimeTotalAllocBytesCallback(cb func() int64, opts ...metric.O
})
}
// InitQueueLength configures the QueueLength metric.
func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric.ObserveOption) (metric.Registration, error) {
var err error
builder.QueueLength, err = builder.meter.Int64ObservableGauge(
"otelcol_queue_length",
metric.WithDescription("This metric is optional and therefore not initialized in NewTelemetryBuilder. [alpha]"),
metric.WithUnit("{items}"),
)
// RegisterProcessRuntimeTotalAllocBytesCallback sets callback for observable ProcessRuntimeTotalAllocBytes metric.
func (builder *TelemetryBuilder) RegisterProcessRuntimeTotalAllocBytesCallback(cb metric.Int64Callback) error {
reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
cb(ctx, &observerInt64{inst: builder.ProcessRuntimeTotalAllocBytes, obs: o})
return nil
}, builder.ProcessRuntimeTotalAllocBytes)
if err != nil {
return nil, err
return err
}
builder.mu.Lock()
defer builder.mu.Unlock()
builder.registrations = append(builder.registrations, reg)
return nil
}
// Deprecated: [v0.119.0] use RegisterQueueLengthCallback.
func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric.ObserveOption) (metric.Registration, error) {
reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(builder.QueueLength, cb(), opts...)
return nil
@ -71,6 +82,40 @@ func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric
return reg, err
}
// RegisterQueueLengthCallback sets callback for observable QueueLength metric.
func (builder *TelemetryBuilder) RegisterQueueLengthCallback(cb metric.Int64Callback) error {
reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
cb(ctx, &observerInt64{inst: builder.QueueLength, obs: o})
return nil
}, builder.QueueLength)
if err != nil {
return err
}
builder.mu.Lock()
defer builder.mu.Unlock()
builder.registrations = append(builder.registrations, reg)
return nil
}
type observerInt64 struct {
embedded.Int64Observer
inst metric.Int64Observable
obs metric.Observer
}
func (oi *observerInt64) Observe(value int64, opts ...metric.ObserveOption) {
oi.obs.ObserveInt64(oi.inst, value, opts...)
}
// Shutdown unregister all registered callbacks for async instruments.
func (builder *TelemetryBuilder) Shutdown() {
builder.mu.Lock()
defer builder.mu.Unlock()
for _, reg := range builder.registrations {
reg.Unregister()
}
}
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
// for a component
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {
@ -92,14 +137,25 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
metric.WithUnit("By"),
)
errs = errors.Join(errs, err)
_, err = builder.meter.RegisterCallback(builder.observeProcessRuntimeTotalAllocBytes, builder.ProcessRuntimeTotalAllocBytes)
errs = errors.Join(errs, err)
if builder.observeProcessRuntimeTotalAllocBytes != nil {
reg, err := builder.meter.RegisterCallback(builder.observeProcessRuntimeTotalAllocBytes, builder.ProcessRuntimeTotalAllocBytes)
errs = errors.Join(errs, err)
if err == nil {
builder.registrations = append(builder.registrations, reg)
}
}
builder.QueueCapacity, err = builder.meter.Int64Gauge(
"otelcol_queue_capacity",
metric.WithDescription("Queue capacity - sync gauge example."),
metric.WithUnit("{items}"),
)
errs = errors.Join(errs, err)
builder.QueueLength, err = builder.meter.Int64ObservableGauge(
"otelcol_queue_length",
metric.WithDescription("This metric is optional and therefore not initialized in NewTelemetryBuilder. [alpha]"),
metric.WithUnit("{items}"),
)
errs = errors.Join(errs, err)
builder.RequestDuration, err = builder.meter.Float64Histogram(
"otelcol_request_duration",
metric.WithDescription("Duration of request [alpha]"),

View File

@ -87,6 +87,19 @@ func AssertEqualQueueCapacity(t *testing.T, tt componenttest.Telemetry, dps []me
metricdatatest.AssertEqual(t, want, got, opts...)
}
func AssertEqualQueueLength(t *testing.T, tt componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) {
want := metricdata.Metrics{
Name: "otelcol_queue_length",
Description: "This metric is optional and therefore not initialized in NewTelemetryBuilder. [alpha]",
Unit: "{items}",
Data: metricdata.Gauge[int64]{
DataPoints: dps,
},
}
got := getMetric(t, tt, "otelcol_queue_length")
metricdatatest.AssertEqual(t, want, got, opts...)
}
func AssertEqualRequestDuration(t *testing.T, tt componenttest.Telemetry, dps []metricdata.HistogramDataPoint[float64], opts ...metricdatatest.Option) {
want := metricdata.Metrics{
Name: "otelcol_request_duration",

View File

@ -7,6 +7,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
@ -15,12 +16,17 @@ import (
func TestSetupTelemetry(t *testing.T) {
testTel := SetupTelemetry()
tb, err := metadata.NewTelemetryBuilder(
testTel.NewTelemetrySettings(),
metadata.WithProcessRuntimeTotalAllocBytesCallback(func() int64 { return 1 }),
)
tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings())
require.NoError(t, err)
require.NotNil(t, tb)
defer tb.Shutdown()
require.NoError(t, tb.RegisterProcessRuntimeTotalAllocBytesCallback(func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(1)
return nil
}))
require.NoError(t, tb.RegisterQueueLengthCallback(func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(1)
return nil
}))
tb.BatchSizeTriggerSend.Add(context.Background(), 1)
tb.QueueCapacity.Record(context.Background(), 1)
tb.RequestDuration.Record(context.Background(), 1)
@ -60,6 +66,16 @@ func TestSetupTelemetry(t *testing.T) {
},
},
},
{
Name: "otelcol_queue_length",
Description: "This metric is optional and therefore not initialized in NewTelemetryBuilder. [alpha]",
Unit: "{items}",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{},
},
},
},
{
Name: "otelcol_request_duration",
Description: "Duration of request [alpha]",
@ -72,22 +88,21 @@ func TestSetupTelemetry(t *testing.T) {
},
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
AssertEqualBatchSizeTriggerSend(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessRuntimeTotalAllocBytes(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualQueueCapacity(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualQueueLength(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualRequestDuration(t, testTel.Telemetry,
[]metricdata.HistogramDataPoint[float64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.HistogramDataPoint[float64]{{}}, metricdatatest.IgnoreValue(),
metricdatatest.IgnoreTimestamp())
require.NoError(t, testTel.Shutdown(context.Background()))
}

View File

@ -45,48 +45,11 @@ func TestComponentTelemetry(t *testing.T) {
rcv, ok := receiver.(nopReceiver)
require.True(t, ok)
rcv.initOptionalMetric()
// TODO: this needs to be replaces once the optional metric is supported in AssertMetricEqual functions.
tt.AssertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_batch_size_trigger_send",
Description: "Number of times the batch was sent due to a size trigger [deprecated since v0.110.0]",
Unit: "{times}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
},
},
metadatatest.AssertEqualQueueLength(t, tt.Telemetry,
[]metricdata.DataPoint[int64]{
{
Value: 3,
},
},
{
Name: "otelcol_process_runtime_total_alloc_bytes",
Description: "Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')",
Unit: "By",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 2,
},
},
},
},
{
Name: "otelcol_queue_length",
Description: "This metric is optional and therefore not initialized in NewTelemetryBuilder. [alpha]",
Unit: "{items}",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
},
},
},
},
}, metricdatatest.IgnoreTimestamp())
}, metricdatatest.IgnoreTimestamp())
require.NoError(t, tt.Shutdown(context.Background()))
}

View File

@ -7,20 +7,19 @@ import (
{{- range $_, $metric := .Telemetry.Metrics }}
{{- if $metric.Data.Async }}
"context"
{{- break}}
"sync"
{{- break}}
{{- end }}
{{- end }}
"errors"
{{- end }}
"go.opentelemetry.io/otel/metric"
{{- if .Telemetry.Metrics }}
noopmetric "go.opentelemetry.io/otel/metric/noop"
{{- end }}
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
)
func Meter(settings component.TelemetrySettings) metric.Meter {
@ -36,9 +35,12 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer {
// as defined in metadata and user config.
type TelemetryBuilder struct {
meter metric.Meter
mu sync.Mutex
registrations []metric.Registration
{{- range $name, $metric := .Telemetry.Metrics }}
{{ $name.Render }} metric.{{ $metric.Data.Instrument }}
{{- if and ($metric.Data.Async) (not $metric.Optional) }}
// TODO: Remove in v0.119.0 when remove deprecated funcs.
observe{{ $name.Render }} func(context.Context, metric.Observer) error
{{- end }}
{{- end }}
@ -55,35 +57,19 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
tbof(mb)
}
{{- range $name, $metric := .Telemetry.Metrics }}
{{- if $metric.Optional }}
// Init{{ $name.Render }} configures the {{ $name.Render }} metric.
{{- if $metric.Optional }}
// Deprecated: [v0.119.0] use Register{{ $name.Render }}Callback.
func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async -}}cb func() {{ $metric.Data.BasicType }}, {{- end }}opts ...metric.ObserveOption) (metric.Registration, error) {
var err error
builder.{{ $name.Render }}, err = builder.meter.{{ $metric.Data.Instrument }}(
"otelcol_{{ $name }}",
metric.WithDescription("{{ $metric.Description }}{{ $metric.Stability }}"),
metric.WithUnit("{{ $metric.Unit }}"),
{{- if eq $metric.Data.Type "Histogram" -}}
{{ if $metric.Data.Boundaries -}}metric.WithExplicitBucketBoundaries([]float64{ {{- range $metric.Data.Boundaries }} {{.}}, {{- end }} }...),{{- end }}
{{- end }}
)
{{- if $metric.Data.Async }}
if err != nil {
return nil, err
}
reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
o.Observe{{ casesTitle $metric.Data.BasicType }}(builder.{{ $name.Render }}, cb(), opts...)
return nil
}, builder.{{ $name.Render }})
{{- end }}
return reg, err
}
{{- else }}
{{ if $metric.Data.Async -}}
// With{{ $name.Render }}Callback sets callback for observable {{ $name.Render }} metric.
{{ if $metric.Data.Async -}}
// Deprecated: [v0.119.0] use Register{{ $name.Render }}Callback.
func With{{ $name.Render }}Callback(cb func() {{ $metric.Data.BasicType }}, opts ...metric.ObserveOption) TelemetryBuilderOption {
return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) {
builder.observe{{ $name.Render }} = func(_ context.Context, o metric.Observer) error {
@ -92,10 +78,70 @@ func With{{ $name.Render }}Callback(cb func() {{ $metric.Data.BasicType }}, opts
}
})
}
{{- end }}
{{- end }}
{{ if $metric.Data.Async -}}
// Register{{ $name.Render }}Callback sets callback for observable {{ $name.Render }} metric.
func (builder *TelemetryBuilder) Register{{ $name.Render }}Callback(cb metric.{{ casesTitle $metric.Data.BasicType }}Callback) error {
reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
cb(ctx, &observer{{ casesTitle $metric.Data.BasicType }}{inst : builder.{{ $name.Render }}, obs: o})
return nil
}, builder.{{ $name.Render }})
if err != nil {
return err
}
builder.mu.Lock()
defer builder.mu.Unlock()
builder.registrations = append(builder.registrations, reg)
return nil
}
{{- end }}
{{- end }}
{{- range $name, $metric := .Telemetry.Metrics }}
{{- if $metric.Data.Async }}
{{ if eq $metric.Data.BasicType "int64" -}}
type observerInt64 struct {
embedded.Int64Observer
inst metric.Int64Observable
obs metric.Observer
}
func (oi *observerInt64) Observe(value int64, opts ...metric.ObserveOption) {
oi.obs.ObserveInt64(oi.inst, value, opts...)
}
{{ break }}
{{- end }}
{{- end }}
{{- end }}
{{- range $name, $metric := .Telemetry.Metrics }}
{{- if $metric.Data.Async }}
{{ if eq $metric.Data.BasicType "float64" -}}
type observerFloat64 struct {
embedded.Float64Observer
inst metric.Float64Observable
obs metric.Observer
}
func (oi *observerFloat64) Observe(value float64, opts ...metric.ObserveOption) {
oi.obs.ObserveFloat64(oi.inst, value, opts...)
}
{{ break }}
{{- end }}
{{- end }}
{{- end }}
// Shutdown unregister all registered callbacks for async instruments.
func (builder *TelemetryBuilder) Shutdown() {
builder.mu.Lock()
defer builder.mu.Unlock()
for _, reg := range builder.registrations {
reg.Unregister()
}
}
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
// for a component
@ -108,7 +154,6 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
var err, errs error
{{- range $name, $metric := .Telemetry.Metrics }}
{{- if not $metric.Optional }}
builder.{{ $name.Render }}, err = builder.meter.{{ $metric.Data.Instrument }}(
"otelcol_{{ $name }}",
metric.WithDescription("{{ $metric.Description }}{{ $metric.Stability }}"),
@ -118,10 +163,14 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
{{- end }}
)
errs = errors.Join(errs, err)
{{- if $metric.Data.Async }}
_, err = builder.meter.RegisterCallback(builder.observe{{ $name.Render }}, builder.{{ $name.Render }})
errs = errors.Join(errs, err)
{{- end }}
{{- if and ($metric.Data.Async) (not $metric.Optional) }}
if builder.observe{{ $name.Render }} != nil {
reg, err := builder.meter.RegisterCallback(builder.observe{{ $name.Render }}, builder.{{ $name.Render }})
errs = errors.Join(errs, err)
if err == nil {
builder.registrations = append(builder.registrations, reg)
}
}
{{- end }}
{{- end }}
return &builder, errs

View File

@ -51,7 +51,7 @@ func (tt *Telemetry) AssertMetrics(t *testing.T, expected []metricdata.Metrics,
}
{{ range $name, $metric := .Telemetry.Metrics }}
{{ if not $metric.Optional }}
func AssertEqual{{ $name.Render }}(t *testing.T, tt componenttest.Telemetry, dps []metricdata.{{- if eq $metric.Data.Type "Histogram" }} {{$metric.Data.Type}} {{- end }}DataPoint[{{ $metric.Data.BasicType }}], opts ...metricdatatest.Option) {
want := metricdata.Metrics{
Name: "otelcol_{{ $name }}",
@ -71,7 +71,6 @@ func AssertEqual{{ $name.Render }}(t *testing.T, tt componenttest.Telemetry, dps
metricdatatest.AssertEqual(t, want, got, opts...)
}
{{- end }}
{{- end }}
func getMetric(t *testing.T, tt componenttest.Telemetry, name string) metricdata.Metrics {

View File

@ -13,19 +13,21 @@ import (
func TestSetupTelemetry(t *testing.T) {
testTel := SetupTelemetry()
tb, err := {{ .Package }}.NewTelemetryBuilder(
testTel.NewTelemetrySettings(),
{{- $package := .Package -}}
{{- range $name, $metric := .Telemetry.Metrics }}
{{- if (and (not $metric.Optional) $metric.Data.Async) }}
{{ $package }}.With{{ $name.Render }}Callback(func() {{ $metric.Data.BasicType }} { return 1 }),
{{- end }}
{{- end }}
)
require.NoError(t, err)
require.NotNil(t, tb)
tb, err := {{ .Package }}.NewTelemetryBuilder(testTel.NewTelemetrySettings())
require.NoError(t, err)
defer tb.Shutdown()
{{- range $name, $metric := .Telemetry.Metrics }}
{{- if (and (not $metric.Optional) (not $metric.Data.Async)) }}
{{- if $metric.Data.Async }}
require.NoError(t, tb.Register{{ $name.Render }}Callback(func(_ context.Context, observer metric.{{ casesTitle $metric.Data.BasicType }}Observer) error {
observer.Observe(1)
return nil
}))
{{- end }}
{{- end }}
{{- range $name, $metric := .Telemetry.Metrics }}
{{- if not $metric.Data.Async }}
{{- if eq $metric.Data.Type "Sum" }}
tb.{{ $name.Render }}.Add(context.Background(), 1)
{{- else }}
@ -36,7 +38,6 @@ func TestSetupTelemetry(t *testing.T) {
testTel.AssertMetrics(t, []metricdata.Metrics{
{{- range $name, $metric := .Telemetry.Metrics }}
{{- if not $metric.Optional }}
{
Name: "otelcol_{{ $name }}",
Description: "{{ $metric.Description }}{{ $metric.Stability }}",
@ -64,22 +65,19 @@ func TestSetupTelemetry(t *testing.T) {
},
{{- end }}
},
{{- end }}
{{- end }}
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
{{- range $name, $metric := .Telemetry.Metrics }}
{{ if not $metric.Optional }}
AssertEqual{{ $name.Render }}(t, testTel.Telemetry,
{{ if eq $metric.Data.Type "Gauge" -}}
[]metricdata.DataPoint[{{ $metric.Gauge.MetricValueType.BasicType }}]{{"{{}}"}},
[]metricdata.DataPoint[{{ $metric.Gauge.MetricValueType.BasicType }}]{{"{{Value: 1}}"}},
{{- else if eq $metric.Data.Type "Sum" -}}
[]metricdata.DataPoint[{{ $metric.Sum.MetricValueType.BasicType }}]{{"{{}}"}},
[]metricdata.DataPoint[{{ $metric.Sum.MetricValueType.BasicType }}]{{"{{Value: 1}}"}},
{{- else if eq $metric.Data.Type "Histogram" -}}
[]metricdata.HistogramDataPoint[{{ $metric.Histogram.MetricValueType.BasicType }}]{{"{{}}"}},
[]metricdata.HistogramDataPoint[{{ $metric.Histogram.MetricValueType.BasicType }}]{{"{{}}"}}, metricdatatest.IgnoreValue(),
{{- end }}
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
{{- end }}
metricdatatest.IgnoreTimestamp())
{{- end }}
require.NoError(t, testTel.Shutdown(context.Background()))

View File

@ -5,8 +5,10 @@ package metadata
import (
"context"
"errors"
"sync"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/collector/component"
@ -24,17 +26,23 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer {
// as defined in metadata and user config.
type TelemetryBuilder struct {
meter metric.Meter
mu sync.Mutex
registrations []metric.Registration
ExporterEnqueueFailedLogRecords metric.Int64Counter
ExporterEnqueueFailedMetricPoints metric.Int64Counter
ExporterEnqueueFailedSpans metric.Int64Counter
ExporterQueueCapacity metric.Int64ObservableGauge
ExporterQueueSize metric.Int64ObservableGauge
ExporterSendFailedLogRecords metric.Int64Counter
ExporterSendFailedMetricPoints metric.Int64Counter
ExporterSendFailedSpans metric.Int64Counter
ExporterSentLogRecords metric.Int64Counter
ExporterSentMetricPoints metric.Int64Counter
ExporterSentSpans metric.Int64Counter
// TODO: Remove in v0.119.0 when remove deprecated funcs.
observeExporterQueueCapacity func(context.Context, metric.Observer) error
ExporterQueueSize metric.Int64ObservableGauge
// TODO: Remove in v0.119.0 when remove deprecated funcs.
observeExporterQueueSize func(context.Context, metric.Observer) error
ExporterSendFailedLogRecords metric.Int64Counter
ExporterSendFailedMetricPoints metric.Int64Counter
ExporterSendFailedSpans metric.Int64Counter
ExporterSentLogRecords metric.Int64Counter
ExporterSentMetricPoints metric.Int64Counter
ExporterSentSpans metric.Int64Counter
}
// TelemetryBuilderOption applies changes to default builder.
@ -48,40 +56,73 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
tbof(mb)
}
// InitExporterQueueCapacity configures the ExporterQueueCapacity metric.
func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts ...metric.ObserveOption) (metric.Registration, error) {
var err error
builder.ExporterQueueCapacity, err = builder.meter.Int64ObservableGauge(
"otelcol_exporter_queue_capacity",
metric.WithDescription("Fixed capacity of the retry queue (in batches) [alpha]"),
metric.WithUnit("{batches}"),
)
if err != nil {
return nil, err
}
reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(builder.ExporterQueueCapacity, cb(), opts...)
return nil
}, builder.ExporterQueueCapacity)
return reg, err
// Deprecated: [v0.119.0] use RegisterExporterQueueCapacityCallback.
func WithExporterQueueCapacityCallback(cb func() int64, opts ...metric.ObserveOption) TelemetryBuilderOption {
return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) {
builder.observeExporterQueueCapacity = func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(builder.ExporterQueueCapacity, cb(), opts...)
return nil
}
})
}
// InitExporterQueueSize configures the ExporterQueueSize metric.
func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ...metric.ObserveOption) (metric.Registration, error) {
var err error
builder.ExporterQueueSize, err = builder.meter.Int64ObservableGauge(
"otelcol_exporter_queue_size",
metric.WithDescription("Current size of the retry queue (in batches) [alpha]"),
metric.WithUnit("{batches}"),
)
// RegisterExporterQueueCapacityCallback sets callback for observable ExporterQueueCapacity metric.
func (builder *TelemetryBuilder) RegisterExporterQueueCapacityCallback(cb metric.Int64Callback) error {
reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
cb(ctx, &observerInt64{inst: builder.ExporterQueueCapacity, obs: o})
return nil
}, builder.ExporterQueueCapacity)
if err != nil {
return nil, err
return err
}
reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(builder.ExporterQueueSize, cb(), opts...)
builder.mu.Lock()
defer builder.mu.Unlock()
builder.registrations = append(builder.registrations, reg)
return nil
}
// Deprecated: [v0.119.0] use RegisterExporterQueueSizeCallback.
func WithExporterQueueSizeCallback(cb func() int64, opts ...metric.ObserveOption) TelemetryBuilderOption {
return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) {
builder.observeExporterQueueSize = func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(builder.ExporterQueueSize, cb(), opts...)
return nil
}
})
}
// RegisterExporterQueueSizeCallback sets callback for observable ExporterQueueSize metric.
func (builder *TelemetryBuilder) RegisterExporterQueueSizeCallback(cb metric.Int64Callback) error {
reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
cb(ctx, &observerInt64{inst: builder.ExporterQueueSize, obs: o})
return nil
}, builder.ExporterQueueSize)
return reg, err
if err != nil {
return err
}
builder.mu.Lock()
defer builder.mu.Unlock()
builder.registrations = append(builder.registrations, reg)
return nil
}
type observerInt64 struct {
embedded.Int64Observer
inst metric.Int64Observable
obs metric.Observer
}
func (oi *observerInt64) Observe(value int64, opts ...metric.ObserveOption) {
oi.obs.ObserveInt64(oi.inst, value, opts...)
}
// Shutdown unregister all registered callbacks for async instruments.
func (builder *TelemetryBuilder) Shutdown() {
builder.mu.Lock()
defer builder.mu.Unlock()
for _, reg := range builder.registrations {
reg.Unregister()
}
}
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
@ -111,6 +152,32 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
metric.WithUnit("{spans}"),
)
errs = errors.Join(errs, err)
builder.ExporterQueueCapacity, err = builder.meter.Int64ObservableGauge(
"otelcol_exporter_queue_capacity",
metric.WithDescription("Fixed capacity of the retry queue (in batches) [alpha]"),
metric.WithUnit("{batches}"),
)
errs = errors.Join(errs, err)
if builder.observeExporterQueueCapacity != nil {
reg, err := builder.meter.RegisterCallback(builder.observeExporterQueueCapacity, builder.ExporterQueueCapacity)
errs = errors.Join(errs, err)
if err == nil {
builder.registrations = append(builder.registrations, reg)
}
}
builder.ExporterQueueSize, err = builder.meter.Int64ObservableGauge(
"otelcol_exporter_queue_size",
metric.WithDescription("Current size of the retry queue (in batches) [alpha]"),
metric.WithUnit("{batches}"),
)
errs = errors.Join(errs, err)
if builder.observeExporterQueueSize != nil {
reg, err := builder.meter.RegisterCallback(builder.observeExporterQueueSize, builder.ExporterQueueSize)
errs = errors.Join(errs, err)
if err == nil {
builder.registrations = append(builder.registrations, reg)
}
}
builder.ExporterSendFailedLogRecords, err = builder.meter.Int64Counter(
"otelcol_exporter_send_failed_log_records",
metric.WithDescription("Number of log records in failed attempts to send to destination. [alpha]"),

View File

@ -89,6 +89,32 @@ func AssertEqualExporterEnqueueFailedSpans(t *testing.T, tt componenttest.Teleme
metricdatatest.AssertEqual(t, want, got, opts...)
}
func AssertEqualExporterQueueCapacity(t *testing.T, tt componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) {
want := metricdata.Metrics{
Name: "otelcol_exporter_queue_capacity",
Description: "Fixed capacity of the retry queue (in batches) [alpha]",
Unit: "{batches}",
Data: metricdata.Gauge[int64]{
DataPoints: dps,
},
}
got := getMetric(t, tt, "otelcol_exporter_queue_capacity")
metricdatatest.AssertEqual(t, want, got, opts...)
}
func AssertEqualExporterQueueSize(t *testing.T, tt componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) {
want := metricdata.Metrics{
Name: "otelcol_exporter_queue_size",
Description: "Current size of the retry queue (in batches) [alpha]",
Unit: "{batches}",
Data: metricdata.Gauge[int64]{
DataPoints: dps,
},
}
got := getMetric(t, tt, "otelcol_exporter_queue_size")
metricdatatest.AssertEqual(t, want, got, opts...)
}
func AssertEqualExporterSendFailedLogRecords(t *testing.T, tt componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) {
want := metricdata.Metrics{
Name: "otelcol_exporter_send_failed_log_records",

View File

@ -7,6 +7,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
@ -15,11 +16,17 @@ import (
func TestSetupTelemetry(t *testing.T) {
testTel := SetupTelemetry()
tb, err := metadata.NewTelemetryBuilder(
testTel.NewTelemetrySettings(),
)
tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings())
require.NoError(t, err)
require.NotNil(t, tb)
defer tb.Shutdown()
require.NoError(t, tb.RegisterExporterQueueCapacityCallback(func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(1)
return nil
}))
require.NoError(t, tb.RegisterExporterQueueSizeCallback(func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(1)
return nil
}))
tb.ExporterEnqueueFailedLogRecords.Add(context.Background(), 1)
tb.ExporterEnqueueFailedMetricPoints.Add(context.Background(), 1)
tb.ExporterEnqueueFailedSpans.Add(context.Background(), 1)
@ -67,6 +74,26 @@ func TestSetupTelemetry(t *testing.T) {
},
},
},
{
Name: "otelcol_exporter_queue_capacity",
Description: "Fixed capacity of the retry queue (in batches) [alpha]",
Unit: "{batches}",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{},
},
},
},
{
Name: "otelcol_exporter_queue_size",
Description: "Current size of the retry queue (in batches) [alpha]",
Unit: "{batches}",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{},
},
},
},
{
Name: "otelcol_exporter_send_failed_log_records",
Description: "Number of log records in failed attempts to send to destination. [alpha]",
@ -140,42 +167,39 @@ func TestSetupTelemetry(t *testing.T) {
},
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
AssertEqualExporterEnqueueFailedLogRecords(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualExporterEnqueueFailedMetricPoints(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualExporterEnqueueFailedSpans(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualExporterQueueCapacity(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualExporterQueueSize(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualExporterSendFailedLogRecords(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualExporterSendFailedMetricPoints(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualExporterSendFailedSpans(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualExporterSentLogRecords(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualExporterSentMetricPoints(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualExporterSentSpans(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
require.NoError(t, testTel.Shutdown(context.Background()))
}

View File

@ -75,10 +75,9 @@ type QueueSender struct {
batcher queue.Batcher
consumers *queue.Consumers[internal.Request]
obsrep *ObsReport
exporterID component.ID
logger *zap.Logger
shutdownFns []component.ShutdownFunc
obsrep *ObsReport
exporterID component.ID
logger *zap.Logger
}
func NewQueueSender(
@ -132,25 +131,15 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
exporterAttr := attribute.String(ExporterKey, qs.exporterID.String())
dataTypeAttr := attribute.String(DataTypeKey, qs.obsrep.Signal.String())
reg1, err1 := qs.obsrep.TelemetryBuilder.InitExporterQueueSize(func() int64 { return qs.queue.Size() },
metric.WithAttributeSet(attribute.NewSet(exporterAttr, dataTypeAttr)))
if reg1 != nil {
qs.shutdownFns = append(qs.shutdownFns, func(context.Context) error {
return reg1.Unregister()
})
}
reg2, err2 := qs.obsrep.TelemetryBuilder.InitExporterQueueCapacity(func() int64 { return qs.queue.Capacity() },
metric.WithAttributeSet(attribute.NewSet(exporterAttr)))
if reg2 != nil {
qs.shutdownFns = append(qs.shutdownFns, func(context.Context) error {
return reg2.Unregister()
})
}
return errors.Join(err1, err2)
return errors.Join(
qs.obsrep.TelemetryBuilder.RegisterExporterQueueSizeCallback(func(_ context.Context, o metric.Int64Observer) error {
o.Observe(qs.queue.Size(), metric.WithAttributeSet(attribute.NewSet(exporterAttr, dataTypeAttr)))
return nil
}),
qs.obsrep.TelemetryBuilder.RegisterExporterQueueCapacityCallback(func(_ context.Context, o metric.Int64Observer) error {
o.Observe(qs.queue.Capacity(), metric.WithAttributeSet(attribute.NewSet(exporterAttr)))
return nil
}))
}
// Shutdown is invoked during service shutdown.
@ -158,13 +147,8 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error {
// Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.
for _, fn := range qs.shutdownFns {
err := fn(ctx)
if err != nil {
qs.logger.Warn("Error while shutting down QueueSender", zap.Error(err))
}
}
qs.shutdownFns = nil
// At the end, make sure metrics are un-registered since we want to free this object.
defer qs.obsrep.TelemetryBuilder.Shutdown()
if err := qs.queue.Shutdown(ctx); err != nil {
return err
@ -172,7 +156,8 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error {
if usePullingBasedExporterQueueBatcher.IsEnabled() {
return qs.batcher.Shutdown(ctx)
}
return qs.consumers.Shutdown(ctx)
err := qs.consumers.Shutdown(ctx)
return err
}
// send implements the requestSender interface. It puts the request in the queue.

View File

@ -105,7 +105,6 @@ telemetry:
level: alpha
description: Current size of the retry queue (in batches)
unit: "{batches}"
optional: true
gauge:
value_type: int
async: true
@ -116,7 +115,6 @@ telemetry:
level: alpha
description: Fixed capacity of the retry queue (in batches)
unit: "{batches}"
optional: true
gauge:
value_type: int
async: true

View File

@ -177,6 +177,7 @@ func (bp *batchProcessor[T]) Shutdown(context.Context) error {
// Wait until all goroutines are done.
bp.goroutines.Wait()
return nil
}

View File

@ -5,8 +5,10 @@ package metadata
import (
"context"
"errors"
"sync"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/collector/component"
@ -23,11 +25,14 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer {
// TelemetryBuilder provides an interface for components to report telemetry
// as defined in metadata and user config.
type TelemetryBuilder struct {
meter metric.Meter
ProcessorBatchBatchSendSize metric.Int64Histogram
ProcessorBatchBatchSendSizeBytes metric.Int64Histogram
ProcessorBatchBatchSizeTriggerSend metric.Int64Counter
ProcessorBatchMetadataCardinality metric.Int64ObservableUpDownCounter
meter metric.Meter
mu sync.Mutex
registrations []metric.Registration
ProcessorBatchBatchSendSize metric.Int64Histogram
ProcessorBatchBatchSendSizeBytes metric.Int64Histogram
ProcessorBatchBatchSizeTriggerSend metric.Int64Counter
ProcessorBatchMetadataCardinality metric.Int64ObservableUpDownCounter
// TODO: Remove in v0.119.0 when remove deprecated funcs.
observeProcessorBatchMetadataCardinality func(context.Context, metric.Observer) error
ProcessorBatchTimeoutTriggerSend metric.Int64Counter
}
@ -43,7 +48,7 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
tbof(mb)
}
// WithProcessorBatchMetadataCardinalityCallback sets callback for observable ProcessorBatchMetadataCardinality metric.
// Deprecated: [v0.119.0] use RegisterProcessorBatchMetadataCardinalityCallback.
func WithProcessorBatchMetadataCardinalityCallback(cb func() int64, opts ...metric.ObserveOption) TelemetryBuilderOption {
return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) {
builder.observeProcessorBatchMetadataCardinality = func(_ context.Context, o metric.Observer) error {
@ -53,6 +58,40 @@ func WithProcessorBatchMetadataCardinalityCallback(cb func() int64, opts ...metr
})
}
// RegisterProcessorBatchMetadataCardinalityCallback sets callback for observable ProcessorBatchMetadataCardinality metric.
func (builder *TelemetryBuilder) RegisterProcessorBatchMetadataCardinalityCallback(cb metric.Int64Callback) error {
reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
cb(ctx, &observerInt64{inst: builder.ProcessorBatchMetadataCardinality, obs: o})
return nil
}, builder.ProcessorBatchMetadataCardinality)
if err != nil {
return err
}
builder.mu.Lock()
defer builder.mu.Unlock()
builder.registrations = append(builder.registrations, reg)
return nil
}
type observerInt64 struct {
embedded.Int64Observer
inst metric.Int64Observable
obs metric.Observer
}
func (oi *observerInt64) Observe(value int64, opts ...metric.ObserveOption) {
oi.obs.ObserveInt64(oi.inst, value, opts...)
}
// Shutdown unregister all registered callbacks for async instruments.
func (builder *TelemetryBuilder) Shutdown() {
builder.mu.Lock()
defer builder.mu.Unlock()
for _, reg := range builder.registrations {
reg.Unregister()
}
}
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
// for a component
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {
@ -88,8 +127,13 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
metric.WithUnit("{combinations}"),
)
errs = errors.Join(errs, err)
_, err = builder.meter.RegisterCallback(builder.observeProcessorBatchMetadataCardinality, builder.ProcessorBatchMetadataCardinality)
errs = errors.Join(errs, err)
if builder.observeProcessorBatchMetadataCardinality != nil {
reg, err := builder.meter.RegisterCallback(builder.observeProcessorBatchMetadataCardinality, builder.ProcessorBatchMetadataCardinality)
errs = errors.Join(errs, err)
if err == nil {
builder.registrations = append(builder.registrations, reg)
}
}
builder.ProcessorBatchTimeoutTriggerSend, err = builder.meter.Int64Counter(
"otelcol_processor_batch_timeout_trigger_send",
metric.WithDescription("Number of times the batch was sent due to a timeout trigger"),

View File

@ -7,6 +7,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
@ -15,12 +16,13 @@ import (
func TestSetupTelemetry(t *testing.T) {
testTel := SetupTelemetry()
tb, err := metadata.NewTelemetryBuilder(
testTel.NewTelemetrySettings(),
metadata.WithProcessorBatchMetadataCardinalityCallback(func() int64 { return 1 }),
)
tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings())
require.NoError(t, err)
require.NotNil(t, tb)
defer tb.Shutdown()
require.NoError(t, tb.RegisterProcessorBatchMetadataCardinalityCallback(func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(1)
return nil
}))
tb.ProcessorBatchBatchSendSize.Record(context.Background(), 1)
tb.ProcessorBatchBatchSendSizeBytes.Record(context.Background(), 1)
tb.ProcessorBatchBatchSizeTriggerSend.Add(context.Background(), 1)
@ -86,26 +88,21 @@ func TestSetupTelemetry(t *testing.T) {
},
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
AssertEqualProcessorBatchBatchSendSize(t, testTel.Telemetry,
[]metricdata.HistogramDataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.HistogramDataPoint[int64]{{}}, metricdatatest.IgnoreValue(),
metricdatatest.IgnoreTimestamp())
AssertEqualProcessorBatchBatchSendSizeBytes(t, testTel.Telemetry,
[]metricdata.HistogramDataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.HistogramDataPoint[int64]{{}}, metricdatatest.IgnoreValue(),
metricdatatest.IgnoreTimestamp())
AssertEqualProcessorBatchBatchSizeTriggerSend(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessorBatchMetadataCardinality(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessorBatchTimeoutTriggerSend(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
require.NoError(t, testTel.Shutdown(context.Background()))
}

View File

@ -31,10 +31,14 @@ type batchProcessorTelemetry struct {
func newBatchProcessorTelemetry(set processor.Settings, currentMetadataCardinality func() int) (*batchProcessorTelemetry, error) {
attrs := metric.WithAttributeSet(attribute.NewSet(attribute.String(internal.ProcessorKey, set.ID.String())))
telemetryBuilder, err := metadata.NewTelemetryBuilder(
set.TelemetrySettings,
metadata.WithProcessorBatchMetadataCardinalityCallback(func() int64 { return int64(currentMetadataCardinality()) }, attrs),
)
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
if err != nil {
return nil, err
}
err = telemetryBuilder.RegisterProcessorBatchMetadataCardinalityCallback(func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(int64(currentMetadataCardinality()), attrs)
return nil
})
if err != nil {
return nil, err
}

View File

@ -4,6 +4,7 @@ package metadata
import (
"errors"
"sync"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
@ -23,6 +24,8 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer {
// as defined in metadata and user config.
type TelemetryBuilder struct {
meter metric.Meter
mu sync.Mutex
registrations []metric.Registration
ProcessorAcceptedLogRecords metric.Int64Counter
ProcessorAcceptedMetricPoints metric.Int64Counter
ProcessorAcceptedSpans metric.Int64Counter
@ -42,6 +45,15 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
tbof(mb)
}
// Shutdown unregister all registered callbacks for async instruments.
func (builder *TelemetryBuilder) Shutdown() {
builder.mu.Lock()
defer builder.mu.Unlock()
for _, reg := range builder.registrations {
reg.Unregister()
}
}
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
// for a component
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {

View File

@ -15,11 +15,9 @@ import (
func TestSetupTelemetry(t *testing.T) {
testTel := SetupTelemetry()
tb, err := metadata.NewTelemetryBuilder(
testTel.NewTelemetrySettings(),
)
tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings())
require.NoError(t, err)
require.NotNil(t, tb)
defer tb.Shutdown()
tb.ProcessorAcceptedLogRecords.Add(context.Background(), 1)
tb.ProcessorAcceptedMetricPoints.Add(context.Background(), 1)
tb.ProcessorAcceptedSpans.Add(context.Background(), 1)
@ -101,30 +99,24 @@ func TestSetupTelemetry(t *testing.T) {
},
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
AssertEqualProcessorAcceptedLogRecords(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessorAcceptedMetricPoints(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessorAcceptedSpans(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessorRefusedLogRecords(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessorRefusedMetricPoints(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessorRefusedSpans(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
require.NoError(t, testTel.Shutdown(context.Background()))
}

View File

@ -4,6 +4,7 @@ package metadata
import (
"errors"
"sync"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
@ -23,6 +24,8 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer {
// as defined in metadata and user config.
type TelemetryBuilder struct {
meter metric.Meter
mu sync.Mutex
registrations []metric.Registration
ProcessorIncomingItems metric.Int64Counter
ProcessorOutgoingItems metric.Int64Counter
}
@ -38,6 +41,15 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
tbof(mb)
}
// Shutdown unregister all registered callbacks for async instruments.
func (builder *TelemetryBuilder) Shutdown() {
builder.mu.Lock()
defer builder.mu.Unlock()
for _, reg := range builder.registrations {
reg.Unregister()
}
}
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
// for a component
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {

View File

@ -15,11 +15,9 @@ import (
func TestSetupTelemetry(t *testing.T) {
testTel := SetupTelemetry()
tb, err := metadata.NewTelemetryBuilder(
testTel.NewTelemetrySettings(),
)
tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings())
require.NoError(t, err)
require.NotNil(t, tb)
defer tb.Shutdown()
tb.ProcessorIncomingItems.Add(context.Background(), 1)
tb.ProcessorOutgoingItems.Add(context.Background(), 1)
@ -49,14 +47,12 @@ func TestSetupTelemetry(t *testing.T) {
},
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
AssertEqualProcessorIncomingItems(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessorOutgoingItems(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
require.NoError(t, testTel.Shutdown(context.Background()))
}

View File

@ -4,6 +4,7 @@ package metadata
import (
"errors"
"sync"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
@ -23,6 +24,8 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer {
// as defined in metadata and user config.
type TelemetryBuilder struct {
meter metric.Meter
mu sync.Mutex
registrations []metric.Registration
ReceiverAcceptedLogRecords metric.Int64Counter
ReceiverAcceptedMetricPoints metric.Int64Counter
ReceiverAcceptedSpans metric.Int64Counter
@ -42,6 +45,15 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
tbof(mb)
}
// Shutdown unregister all registered callbacks for async instruments.
func (builder *TelemetryBuilder) Shutdown() {
builder.mu.Lock()
defer builder.mu.Unlock()
for _, reg := range builder.registrations {
reg.Unregister()
}
}
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
// for a component
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {

View File

@ -15,11 +15,9 @@ import (
func TestSetupTelemetry(t *testing.T) {
testTel := SetupTelemetry()
tb, err := metadata.NewTelemetryBuilder(
testTel.NewTelemetrySettings(),
)
tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings())
require.NoError(t, err)
require.NotNil(t, tb)
defer tb.Shutdown()
tb.ReceiverAcceptedLogRecords.Add(context.Background(), 1)
tb.ReceiverAcceptedMetricPoints.Add(context.Background(), 1)
tb.ReceiverAcceptedSpans.Add(context.Background(), 1)
@ -101,30 +99,24 @@ func TestSetupTelemetry(t *testing.T) {
},
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
AssertEqualReceiverAcceptedLogRecords(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualReceiverAcceptedMetricPoints(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualReceiverAcceptedSpans(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualReceiverRefusedLogRecords(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualReceiverRefusedMetricPoints(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualReceiverRefusedSpans(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
require.NoError(t, testTel.Shutdown(context.Background()))
}

View File

@ -4,6 +4,7 @@ package metadata
import (
"errors"
"sync"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
@ -23,6 +24,8 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer {
// as defined in metadata and user config.
type TelemetryBuilder struct {
meter metric.Meter
mu sync.Mutex
registrations []metric.Registration
ScraperErroredLogRecords metric.Int64Counter
ScraperErroredMetricPoints metric.Int64Counter
ScraperScrapedLogRecords metric.Int64Counter
@ -40,6 +43,15 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
tbof(mb)
}
// Shutdown unregister all registered callbacks for async instruments.
func (builder *TelemetryBuilder) Shutdown() {
builder.mu.Lock()
defer builder.mu.Unlock()
for _, reg := range builder.registrations {
reg.Unregister()
}
}
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
// for a component
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {

View File

@ -15,11 +15,9 @@ import (
func TestSetupTelemetry(t *testing.T) {
testTel := SetupTelemetry()
tb, err := metadata.NewTelemetryBuilder(
testTel.NewTelemetrySettings(),
)
tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings())
require.NoError(t, err)
require.NotNil(t, tb)
defer tb.Shutdown()
tb.ScraperErroredLogRecords.Add(context.Background(), 1)
tb.ScraperErroredMetricPoints.Add(context.Background(), 1)
tb.ScraperScrapedLogRecords.Add(context.Background(), 1)
@ -75,22 +73,18 @@ func TestSetupTelemetry(t *testing.T) {
},
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
AssertEqualScraperErroredLogRecords(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualScraperErroredMetricPoints(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualScraperScrapedLogRecords(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualScraperScrapedMetricPoints(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
require.NoError(t, testTel.Shutdown(context.Background()))
}

View File

@ -5,8 +5,10 @@ package metadata
import (
"context"
"errors"
"sync"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/collector/component"
@ -23,19 +25,27 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer {
// TelemetryBuilder provides an interface for components to report telemetry
// as defined in metadata and user config.
type TelemetryBuilder struct {
meter metric.Meter
ProcessCPUSeconds metric.Float64ObservableCounter
observeProcessCPUSeconds func(context.Context, metric.Observer) error
ProcessMemoryRss metric.Int64ObservableGauge
observeProcessMemoryRss func(context.Context, metric.Observer) error
ProcessRuntimeHeapAllocBytes metric.Int64ObservableGauge
observeProcessRuntimeHeapAllocBytes func(context.Context, metric.Observer) error
ProcessRuntimeTotalAllocBytes metric.Int64ObservableCounter
observeProcessRuntimeTotalAllocBytes func(context.Context, metric.Observer) error
ProcessRuntimeTotalSysMemoryBytes metric.Int64ObservableGauge
meter metric.Meter
mu sync.Mutex
registrations []metric.Registration
ProcessCPUSeconds metric.Float64ObservableCounter
// TODO: Remove in v0.119.0 when remove deprecated funcs.
observeProcessCPUSeconds func(context.Context, metric.Observer) error
ProcessMemoryRss metric.Int64ObservableGauge
// TODO: Remove in v0.119.0 when remove deprecated funcs.
observeProcessMemoryRss func(context.Context, metric.Observer) error
ProcessRuntimeHeapAllocBytes metric.Int64ObservableGauge
// TODO: Remove in v0.119.0 when remove deprecated funcs.
observeProcessRuntimeHeapAllocBytes func(context.Context, metric.Observer) error
ProcessRuntimeTotalAllocBytes metric.Int64ObservableCounter
// TODO: Remove in v0.119.0 when remove deprecated funcs.
observeProcessRuntimeTotalAllocBytes func(context.Context, metric.Observer) error
ProcessRuntimeTotalSysMemoryBytes metric.Int64ObservableGauge
// TODO: Remove in v0.119.0 when remove deprecated funcs.
observeProcessRuntimeTotalSysMemoryBytes func(context.Context, metric.Observer) error
ProcessUptime metric.Float64ObservableCounter
observeProcessUptime func(context.Context, metric.Observer) error
// TODO: Remove in v0.119.0 when remove deprecated funcs.
observeProcessUptime func(context.Context, metric.Observer) error
}
// TelemetryBuilderOption applies changes to default builder.
@ -49,7 +59,7 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
tbof(mb)
}
// WithProcessCPUSecondsCallback sets callback for observable ProcessCPUSeconds metric.
// Deprecated: [v0.119.0] use RegisterProcessCPUSecondsCallback.
func WithProcessCPUSecondsCallback(cb func() float64, opts ...metric.ObserveOption) TelemetryBuilderOption {
return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) {
builder.observeProcessCPUSeconds = func(_ context.Context, o metric.Observer) error {
@ -59,7 +69,22 @@ func WithProcessCPUSecondsCallback(cb func() float64, opts ...metric.ObserveOpti
})
}
// WithProcessMemoryRssCallback sets callback for observable ProcessMemoryRss metric.
// RegisterProcessCPUSecondsCallback sets callback for observable ProcessCPUSeconds metric.
func (builder *TelemetryBuilder) RegisterProcessCPUSecondsCallback(cb metric.Float64Callback) error {
reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
cb(ctx, &observerFloat64{inst: builder.ProcessCPUSeconds, obs: o})
return nil
}, builder.ProcessCPUSeconds)
if err != nil {
return err
}
builder.mu.Lock()
defer builder.mu.Unlock()
builder.registrations = append(builder.registrations, reg)
return nil
}
// Deprecated: [v0.119.0] use RegisterProcessMemoryRssCallback.
func WithProcessMemoryRssCallback(cb func() int64, opts ...metric.ObserveOption) TelemetryBuilderOption {
return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) {
builder.observeProcessMemoryRss = func(_ context.Context, o metric.Observer) error {
@ -69,7 +94,22 @@ func WithProcessMemoryRssCallback(cb func() int64, opts ...metric.ObserveOption)
})
}
// WithProcessRuntimeHeapAllocBytesCallback sets callback for observable ProcessRuntimeHeapAllocBytes metric.
// RegisterProcessMemoryRssCallback sets callback for observable ProcessMemoryRss metric.
func (builder *TelemetryBuilder) RegisterProcessMemoryRssCallback(cb metric.Int64Callback) error {
reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
cb(ctx, &observerInt64{inst: builder.ProcessMemoryRss, obs: o})
return nil
}, builder.ProcessMemoryRss)
if err != nil {
return err
}
builder.mu.Lock()
defer builder.mu.Unlock()
builder.registrations = append(builder.registrations, reg)
return nil
}
// Deprecated: [v0.119.0] use RegisterProcessRuntimeHeapAllocBytesCallback.
func WithProcessRuntimeHeapAllocBytesCallback(cb func() int64, opts ...metric.ObserveOption) TelemetryBuilderOption {
return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) {
builder.observeProcessRuntimeHeapAllocBytes = func(_ context.Context, o metric.Observer) error {
@ -79,7 +119,22 @@ func WithProcessRuntimeHeapAllocBytesCallback(cb func() int64, opts ...metric.Ob
})
}
// WithProcessRuntimeTotalAllocBytesCallback sets callback for observable ProcessRuntimeTotalAllocBytes metric.
// RegisterProcessRuntimeHeapAllocBytesCallback sets callback for observable ProcessRuntimeHeapAllocBytes metric.
func (builder *TelemetryBuilder) RegisterProcessRuntimeHeapAllocBytesCallback(cb metric.Int64Callback) error {
reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
cb(ctx, &observerInt64{inst: builder.ProcessRuntimeHeapAllocBytes, obs: o})
return nil
}, builder.ProcessRuntimeHeapAllocBytes)
if err != nil {
return err
}
builder.mu.Lock()
defer builder.mu.Unlock()
builder.registrations = append(builder.registrations, reg)
return nil
}
// Deprecated: [v0.119.0] use RegisterProcessRuntimeTotalAllocBytesCallback.
func WithProcessRuntimeTotalAllocBytesCallback(cb func() int64, opts ...metric.ObserveOption) TelemetryBuilderOption {
return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) {
builder.observeProcessRuntimeTotalAllocBytes = func(_ context.Context, o metric.Observer) error {
@ -89,7 +144,22 @@ func WithProcessRuntimeTotalAllocBytesCallback(cb func() int64, opts ...metric.O
})
}
// WithProcessRuntimeTotalSysMemoryBytesCallback sets callback for observable ProcessRuntimeTotalSysMemoryBytes metric.
// RegisterProcessRuntimeTotalAllocBytesCallback sets callback for observable ProcessRuntimeTotalAllocBytes metric.
func (builder *TelemetryBuilder) RegisterProcessRuntimeTotalAllocBytesCallback(cb metric.Int64Callback) error {
reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
cb(ctx, &observerInt64{inst: builder.ProcessRuntimeTotalAllocBytes, obs: o})
return nil
}, builder.ProcessRuntimeTotalAllocBytes)
if err != nil {
return err
}
builder.mu.Lock()
defer builder.mu.Unlock()
builder.registrations = append(builder.registrations, reg)
return nil
}
// Deprecated: [v0.119.0] use RegisterProcessRuntimeTotalSysMemoryBytesCallback.
func WithProcessRuntimeTotalSysMemoryBytesCallback(cb func() int64, opts ...metric.ObserveOption) TelemetryBuilderOption {
return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) {
builder.observeProcessRuntimeTotalSysMemoryBytes = func(_ context.Context, o metric.Observer) error {
@ -99,7 +169,22 @@ func WithProcessRuntimeTotalSysMemoryBytesCallback(cb func() int64, opts ...metr
})
}
// WithProcessUptimeCallback sets callback for observable ProcessUptime metric.
// RegisterProcessRuntimeTotalSysMemoryBytesCallback sets callback for observable ProcessRuntimeTotalSysMemoryBytes metric.
func (builder *TelemetryBuilder) RegisterProcessRuntimeTotalSysMemoryBytesCallback(cb metric.Int64Callback) error {
reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
cb(ctx, &observerInt64{inst: builder.ProcessRuntimeTotalSysMemoryBytes, obs: o})
return nil
}, builder.ProcessRuntimeTotalSysMemoryBytes)
if err != nil {
return err
}
builder.mu.Lock()
defer builder.mu.Unlock()
builder.registrations = append(builder.registrations, reg)
return nil
}
// Deprecated: [v0.119.0] use RegisterProcessUptimeCallback.
func WithProcessUptimeCallback(cb func() float64, opts ...metric.ObserveOption) TelemetryBuilderOption {
return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) {
builder.observeProcessUptime = func(_ context.Context, o metric.Observer) error {
@ -109,6 +194,50 @@ func WithProcessUptimeCallback(cb func() float64, opts ...metric.ObserveOption)
})
}
// RegisterProcessUptimeCallback sets callback for observable ProcessUptime metric.
func (builder *TelemetryBuilder) RegisterProcessUptimeCallback(cb metric.Float64Callback) error {
reg, err := builder.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
cb(ctx, &observerFloat64{inst: builder.ProcessUptime, obs: o})
return nil
}, builder.ProcessUptime)
if err != nil {
return err
}
builder.mu.Lock()
defer builder.mu.Unlock()
builder.registrations = append(builder.registrations, reg)
return nil
}
type observerInt64 struct {
embedded.Int64Observer
inst metric.Int64Observable
obs metric.Observer
}
func (oi *observerInt64) Observe(value int64, opts ...metric.ObserveOption) {
oi.obs.ObserveInt64(oi.inst, value, opts...)
}
type observerFloat64 struct {
embedded.Float64Observer
inst metric.Float64Observable
obs metric.Observer
}
func (oi *observerFloat64) Observe(value float64, opts ...metric.ObserveOption) {
oi.obs.ObserveFloat64(oi.inst, value, opts...)
}
// Shutdown unregister all registered callbacks for async instruments.
func (builder *TelemetryBuilder) Shutdown() {
builder.mu.Lock()
defer builder.mu.Unlock()
for _, reg := range builder.registrations {
reg.Unregister()
}
}
// NewTelemetryBuilder provides a struct with methods to update all internal telemetry
// for a component
func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) {
@ -124,47 +253,77 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
metric.WithUnit("s"),
)
errs = errors.Join(errs, err)
_, err = builder.meter.RegisterCallback(builder.observeProcessCPUSeconds, builder.ProcessCPUSeconds)
errs = errors.Join(errs, err)
if builder.observeProcessCPUSeconds != nil {
reg, err := builder.meter.RegisterCallback(builder.observeProcessCPUSeconds, builder.ProcessCPUSeconds)
errs = errors.Join(errs, err)
if err == nil {
builder.registrations = append(builder.registrations, reg)
}
}
builder.ProcessMemoryRss, err = builder.meter.Int64ObservableGauge(
"otelcol_process_memory_rss",
metric.WithDescription("Total physical memory (resident set size) [alpha]"),
metric.WithUnit("By"),
)
errs = errors.Join(errs, err)
_, err = builder.meter.RegisterCallback(builder.observeProcessMemoryRss, builder.ProcessMemoryRss)
errs = errors.Join(errs, err)
if builder.observeProcessMemoryRss != nil {
reg, err := builder.meter.RegisterCallback(builder.observeProcessMemoryRss, builder.ProcessMemoryRss)
errs = errors.Join(errs, err)
if err == nil {
builder.registrations = append(builder.registrations, reg)
}
}
builder.ProcessRuntimeHeapAllocBytes, err = builder.meter.Int64ObservableGauge(
"otelcol_process_runtime_heap_alloc_bytes",
metric.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc') [alpha]"),
metric.WithUnit("By"),
)
errs = errors.Join(errs, err)
_, err = builder.meter.RegisterCallback(builder.observeProcessRuntimeHeapAllocBytes, builder.ProcessRuntimeHeapAllocBytes)
errs = errors.Join(errs, err)
if builder.observeProcessRuntimeHeapAllocBytes != nil {
reg, err := builder.meter.RegisterCallback(builder.observeProcessRuntimeHeapAllocBytes, builder.ProcessRuntimeHeapAllocBytes)
errs = errors.Join(errs, err)
if err == nil {
builder.registrations = append(builder.registrations, reg)
}
}
builder.ProcessRuntimeTotalAllocBytes, err = builder.meter.Int64ObservableCounter(
"otelcol_process_runtime_total_alloc_bytes",
metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc') [alpha]"),
metric.WithUnit("By"),
)
errs = errors.Join(errs, err)
_, err = builder.meter.RegisterCallback(builder.observeProcessRuntimeTotalAllocBytes, builder.ProcessRuntimeTotalAllocBytes)
errs = errors.Join(errs, err)
if builder.observeProcessRuntimeTotalAllocBytes != nil {
reg, err := builder.meter.RegisterCallback(builder.observeProcessRuntimeTotalAllocBytes, builder.ProcessRuntimeTotalAllocBytes)
errs = errors.Join(errs, err)
if err == nil {
builder.registrations = append(builder.registrations, reg)
}
}
builder.ProcessRuntimeTotalSysMemoryBytes, err = builder.meter.Int64ObservableGauge(
"otelcol_process_runtime_total_sys_memory_bytes",
metric.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys') [alpha]"),
metric.WithUnit("By"),
)
errs = errors.Join(errs, err)
_, err = builder.meter.RegisterCallback(builder.observeProcessRuntimeTotalSysMemoryBytes, builder.ProcessRuntimeTotalSysMemoryBytes)
errs = errors.Join(errs, err)
if builder.observeProcessRuntimeTotalSysMemoryBytes != nil {
reg, err := builder.meter.RegisterCallback(builder.observeProcessRuntimeTotalSysMemoryBytes, builder.ProcessRuntimeTotalSysMemoryBytes)
errs = errors.Join(errs, err)
if err == nil {
builder.registrations = append(builder.registrations, reg)
}
}
builder.ProcessUptime, err = builder.meter.Float64ObservableCounter(
"otelcol_process_uptime",
metric.WithDescription("Uptime of the process [alpha]"),
metric.WithUnit("s"),
)
errs = errors.Join(errs, err)
_, err = builder.meter.RegisterCallback(builder.observeProcessUptime, builder.ProcessUptime)
errs = errors.Join(errs, err)
if builder.observeProcessUptime != nil {
reg, err := builder.meter.RegisterCallback(builder.observeProcessUptime, builder.ProcessUptime)
errs = errors.Join(errs, err)
if err == nil {
builder.registrations = append(builder.registrations, reg)
}
}
return &builder, errs
}

View File

@ -7,6 +7,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
@ -15,17 +16,33 @@ import (
func TestSetupTelemetry(t *testing.T) {
testTel := SetupTelemetry()
tb, err := metadata.NewTelemetryBuilder(
testTel.NewTelemetrySettings(),
metadata.WithProcessCPUSecondsCallback(func() float64 { return 1 }),
metadata.WithProcessMemoryRssCallback(func() int64 { return 1 }),
metadata.WithProcessRuntimeHeapAllocBytesCallback(func() int64 { return 1 }),
metadata.WithProcessRuntimeTotalAllocBytesCallback(func() int64 { return 1 }),
metadata.WithProcessRuntimeTotalSysMemoryBytesCallback(func() int64 { return 1 }),
metadata.WithProcessUptimeCallback(func() float64 { return 1 }),
)
tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings())
require.NoError(t, err)
require.NotNil(t, tb)
defer tb.Shutdown()
require.NoError(t, tb.RegisterProcessCPUSecondsCallback(func(_ context.Context, observer metric.Float64Observer) error {
observer.Observe(1)
return nil
}))
require.NoError(t, tb.RegisterProcessMemoryRssCallback(func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(1)
return nil
}))
require.NoError(t, tb.RegisterProcessRuntimeHeapAllocBytesCallback(func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(1)
return nil
}))
require.NoError(t, tb.RegisterProcessRuntimeTotalAllocBytesCallback(func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(1)
return nil
}))
require.NoError(t, tb.RegisterProcessRuntimeTotalSysMemoryBytesCallback(func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(1)
return nil
}))
require.NoError(t, tb.RegisterProcessUptimeCallback(func(_ context.Context, observer metric.Float64Observer) error {
observer.Observe(1)
return nil
}))
testTel.AssertMetrics(t, []metricdata.Metrics{
{
@ -95,30 +112,24 @@ func TestSetupTelemetry(t *testing.T) {
},
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
AssertEqualProcessCPUSeconds(t, testTel.Telemetry,
[]metricdata.DataPoint[float64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[float64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessMemoryRss(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessRuntimeHeapAllocBytes(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessRuntimeTotalAllocBytes(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessRuntimeTotalSysMemoryBytes(t, testTel.Telemetry,
[]metricdata.DataPoint[int64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[int64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
AssertEqualProcessUptime(t, testTel.Telemetry,
[]metricdata.DataPoint[float64]{{}},
metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
[]metricdata.DataPoint[float64]{{Value: 1}},
metricdatatest.IgnoreTimestamp())
require.NoError(t, testTel.Shutdown(context.Background()))
}

View File

@ -5,6 +5,7 @@ package proctelemetry // import "go.opentelemetry.io/collector/service/internal/
import (
"context"
"errors"
"os"
"runtime"
"sync"
@ -12,6 +13,7 @@ import (
"github.com/shirou/gopsutil/v4/common"
"github.com/shirou/gopsutil/v4/process"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal/metadata"
@ -74,63 +76,72 @@ func RegisterProcessMetrics(cfg component.TelemetrySettings, opts ...RegisterOpt
return err
}
_, err = metadata.NewTelemetryBuilder(cfg,
metadata.WithProcessUptimeCallback(pm.updateProcessUptime),
metadata.WithProcessRuntimeHeapAllocBytesCallback(pm.updateAllocMem),
metadata.WithProcessRuntimeTotalAllocBytesCallback(pm.updateTotalAllocMem),
metadata.WithProcessRuntimeTotalSysMemoryBytesCallback(pm.updateSysMem),
metadata.WithProcessCPUSecondsCallback(pm.updateCPUSeconds),
metadata.WithProcessMemoryRssCallback(pm.updateRSSMemory),
tb, err := metadata.NewTelemetryBuilder(cfg)
if err != nil {
return err
}
return errors.Join(
tb.RegisterProcessUptimeCallback(pm.updateProcessUptime),
tb.RegisterProcessRuntimeHeapAllocBytesCallback(pm.updateAllocMem),
tb.RegisterProcessRuntimeTotalAllocBytesCallback(pm.updateTotalAllocMem),
tb.RegisterProcessRuntimeTotalSysMemoryBytesCallback(pm.updateSysMem),
tb.RegisterProcessCPUSecondsCallback(pm.updateCPUSeconds),
tb.RegisterProcessMemoryRssCallback(pm.updateRSSMemory),
)
return err
}
func (pm *processMetrics) updateProcessUptime() float64 {
func (pm *processMetrics) updateProcessUptime(_ context.Context, obs metric.Float64Observer) error {
now := time.Now().UnixNano()
return float64(now-pm.startTimeUnixNano) / 1e9
obs.Observe(float64(now-pm.startTimeUnixNano) / 1e9)
return nil
}
func (pm *processMetrics) updateAllocMem() int64 {
func (pm *processMetrics) updateAllocMem(_ context.Context, obs metric.Int64Observer) error {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.readMemStatsIfNeeded()
//nolint:gosec
return int64(pm.ms.Alloc)
obs.Observe(int64(pm.ms.Alloc))
return nil
}
func (pm *processMetrics) updateTotalAllocMem() int64 {
func (pm *processMetrics) updateTotalAllocMem(_ context.Context, obs metric.Int64Observer) error {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.readMemStatsIfNeeded()
//nolint:gosec
return int64(pm.ms.TotalAlloc)
obs.Observe(int64(pm.ms.TotalAlloc))
return nil
}
func (pm *processMetrics) updateSysMem() int64 {
func (pm *processMetrics) updateSysMem(_ context.Context, obs metric.Int64Observer) error {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.readMemStatsIfNeeded()
//nolint:gosec
return int64(pm.ms.Sys)
obs.Observe(int64(pm.ms.Sys))
return nil
}
func (pm *processMetrics) updateCPUSeconds() float64 {
func (pm *processMetrics) updateCPUSeconds(_ context.Context, obs metric.Float64Observer) error {
times, err := pm.proc.TimesWithContext(pm.context)
if err != nil {
return 0
return err
}
return times.User + times.System + times.Idle + times.Nice +
times.Iowait + times.Irq + times.Softirq + times.Steal
obs.Observe(times.User + times.System + times.Idle + times.Nice +
times.Iowait + times.Irq + times.Softirq + times.Steal)
return nil
}
func (pm *processMetrics) updateRSSMemory() int64 {
func (pm *processMetrics) updateRSSMemory(_ context.Context, obs metric.Int64Observer) error {
mem, err := pm.proc.MemoryInfoWithContext(pm.context)
if err != nil {
return 0
return err
}
//nolint:gosec
return int64(mem.RSS)
obs.Observe(int64(mem.RSS))
return nil
}
func (pm *processMetrics) readMemStatsIfNeeded() {