Automate status reporting on start (#8836)

This is part of the continued component status reporting effort.
Currently we have automated status reporting for the following component
lifecycle events: `Starting`, `Stopping`, `Stopped` as well as
definitive errors that occur in the starting or stopping process (e.g.
as determined by an error return value). This leaves the responsibility
to the component to report runtime status after start and before stop.
We'd like to be able to extend the automatic status reporting to report
`StatusOK` if `Start` completes without an error. One complication with
this approach is that some components spawn async work (via goroutines)
that, depending on the Go scheduler, can report status before `Start`
returns. As such, we cannot assume a nil return value from `Start` means
the component has started properly. The solution is to detect if the
component has already reported status when start returns, if it has, we
will use the component-reported status and will not automatically report
status. If it hasn't, and `Start` returns without an error, we can
report `StatusOK`. Any subsequent reports from the component (async or
otherwise) will transition the component status accordingly.

The tl;dr is that we cannot control the execution of async code, that's
up to the Go scheduler, but we can handle the race, report the status
based on the execution, and not clobber status reported from within the
component during the startup process. That said, for components with
async starts, you may see a `StatusOK` before the component-reported
status, or just the component-reported status depending on the actual
execution of the code. In both cases, the end status will be same.

The work in this PR will allow us to simplify #8684 and #8788 and
ultimately choose which direction we want to go for runtime status
reporting.

**Link to tracking Issue:** #7682

**Testing:** units / manual

---------

Co-authored-by: Alex Boten <aboten@lightstep.com>
This commit is contained in:
Matthew Wear 2023-11-28 12:43:32 -08:00 committed by GitHub
parent 1a4ed9ef16
commit 433f7aef92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 346 additions and 112 deletions

View File

@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: statusreporting
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Automates status reporting upon the completion of component.Start().
# One or more tracking issues or pull requests related to the change
issues: [7682]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []

View File

@ -12,6 +12,43 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
)
// TelemetrySettings provides components with APIs to report telemetry.
//
// Note: there is a service version of this struct, servicetelemetry.TelemetrySettings, that mirrors
// this struct with the exception of ReportComponentStatus. When adding or removing anything from
// this struct consider whether or not the same should be done for the service version.
type TelemetrySettings struct {
// Logger that the factory can use during creation and can pass to the created
// component to be used later as well.
Logger *zap.Logger
// TracerProvider that the factory can pass to other instrumented third-party libraries.
TracerProvider trace.TracerProvider
// MeterProvider that the factory can pass to other instrumented third-party libraries.
MeterProvider metric.MeterProvider
// MetricsLevel controls the level of detail for metrics emitted by the collector.
// Experimental: *NOTE* this field is experimental and may be changed or removed.
MetricsLevel configtelemetry.Level
// Resource contains the resource attributes for the collector's telemetry.
Resource pcommon.Resource
// ReportComponentStatus allows a component to report runtime changes in status. The service
// will automatically report status for a component during startup and shutdown. Components can
// use this method to report status after start and before shutdown. ReportComponentStatus
// will only return errors if the API used incorrectly. The two scenarios where an error will
// be returned are:
//
// - An illegal state transition
// - Calling this method before component startup
//
// If the API is being used properly, these errors are safe to ignore.
ReportComponentStatus StatusFunc
}
// Deprecated: [0.91.0] Use TelemetrySettings directly
type TelemetrySettingsBase[T any] struct {
// Logger that the factory can use during creation and can pass to the created
// component to be used later as well.
@ -42,7 +79,3 @@ type TelemetrySettingsBase[T any] struct {
// If the API is being used properly, these errors are safe to ignore.
ReportComponentStatus T
}
// TelemetrySettings and servicetelemetry.Settings differ in the method signature for
// ReportComponentStatus
type TelemetrySettings TelemetrySettingsBase[StatusFunc]

View File

@ -36,10 +36,11 @@ func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), tel
c.seenSettings[telemetrySettings] = struct{}{}
prev := c.telemetry.ReportComponentStatus
c.telemetry.ReportComponentStatus = func(ev *component.StatusEvent) error {
if err := telemetrySettings.ReportComponentStatus(ev); err != nil {
return err
err := telemetrySettings.ReportComponentStatus(ev)
if prevErr := prev(ev); prevErr != nil {
err = prevErr
}
return prev(ev)
return err
}
}
return c, nil

View File

@ -182,11 +182,24 @@ func TestComponentStatusWatcher(t *testing.T) {
// Start the newly created collector.
wg := startCollector(context.Background(), t, col)
// An unhealthy processor asynchronously reports a recoverable error.
expectedStatuses := []component.Status{
// An unhealthy processor asynchronously reports a recoverable error. Depending on the Go
// Scheduler the statuses reported at startup will be one of the two valid sequnces below.
startupStatuses1 := []component.Status{
component.StatusStarting,
component.StatusOK,
component.StatusRecoverableError,
}
startupStatuses2 := []component.Status{
component.StatusStarting,
component.StatusRecoverableError,
}
// the modulus of the actual statuses will match the modulus of the startup statuses
startupStatuses := func(actualStatuses []component.Status) []component.Status {
if len(actualStatuses)%2 == 1 {
return startupStatuses1
}
return startupStatuses2
}
// The "unhealthy" processors will now begin to asynchronously report StatusRecoverableError.
// We expect to see these reports.
@ -197,8 +210,8 @@ func TestComponentStatusWatcher(t *testing.T) {
for k, v := range changedComponents {
// All processors must report a status change with the same ID
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID)
// And all must have the expected statuses
assert.Equal(t, expectedStatuses, v)
// And all must have a valid startup sequence
assert.Equal(t, startupStatuses(v), v)
}
// We have 3 processors with exactly the same ID in otelcol-statuswatcher.yaml
// We must have exactly 3 items in our map. This ensures that the "source" argument
@ -212,8 +225,9 @@ func TestComponentStatusWatcher(t *testing.T) {
wg.Wait()
// Check for additional statuses after Shutdown.
expectedStatuses = append(expectedStatuses, component.StatusStopping, component.StatusStopped)
for _, v := range changedComponents {
expectedStatuses := append([]component.Status{}, startupStatuses(v)...)
expectedStatuses = append(expectedStatuses, component.StatusStopping, component.StatusStopped)
assert.Equal(t, expectedStatuses, v)
}

View File

@ -37,11 +37,18 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
extLogger.Info("Extension is starting...")
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStarting),
)
if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil {
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(err),
)
return err
}
_ = bes.telemetry.Status.ReportComponentOKIfStarting(instanceID)
extLogger.Info("Extension started.")
}
return nil
@ -55,13 +62,22 @@ func (bes *Extensions) Shutdown(ctx context.Context) error {
extID := bes.extensionIDs[i]
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopping),
)
if err := ext.Shutdown(ctx); err != nil {
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(err),
)
errs = multierr.Append(errs, err)
continue
}
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopped))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopped),
)
}
return errs

View File

@ -381,6 +381,7 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
name: "successful startup/shutdown",
expectedStatuses: []*component.StatusEvent{
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
@ -400,6 +401,7 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
name: "shutdown error",
expectedStatuses: []*component.StatusEvent{
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewPermanentErrorEvent(assert.AnError),
},
@ -430,11 +432,11 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
assert.NoError(t, err)
var actualStatuses []*component.StatusEvent
init, statusFunc := status.NewServiceStatusFunc(func(id *component.InstanceID, ev *component.StatusEvent) {
rep := status.NewReporter(func(id *component.InstanceID, ev *component.StatusEvent) {
actualStatuses = append(actualStatuses, ev)
})
extensions.telemetry.ReportComponentStatus = statusFunc
init()
extensions.telemetry.Status = rep
rep.Ready()
assert.Equal(t, tc.startErr, extensions.Start(context.Background(), componenttest.NewNopHost()))
if tc.startErr == nil {

View File

@ -386,12 +386,20 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
}
instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStarting),
)
if compErr := comp.Start(ctx, host); compErr != nil {
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(compErr),
)
return compErr
}
_ = g.telemetry.Status.ReportComponentOKIfStarting(instanceID)
}
return nil
}
@ -417,15 +425,24 @@ func (g *Graph) ShutdownAll(ctx context.Context) error {
}
instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopping),
)
if compErr := comp.Shutdown(ctx); compErr != nil {
errs = multierr.Append(errs, compErr)
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(compErr),
)
continue
}
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopped))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopped),
)
}
return errs
}

View File

@ -2163,11 +2163,13 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{
instanceIDs[rNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
instanceIDs[eNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
@ -2194,6 +2196,7 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
},
instanceIDs[eNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
@ -2206,11 +2209,13 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{
instanceIDs[rSdErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewPermanentErrorEvent(assert.AnError),
},
instanceIDs[eNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
@ -2223,11 +2228,13 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{
instanceIDs[rNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
instanceIDs[eSdErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewPermanentErrorEvent(assert.AnError),
},
@ -2240,12 +2247,12 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
pg.telemetry = servicetelemetry.NewNopTelemetrySettings()
actualStatuses := make(map[*component.InstanceID][]*component.StatusEvent)
init, statusFunc := status.NewServiceStatusFunc(func(id *component.InstanceID, ev *component.StatusEvent) {
rep := status.NewReporter(func(id *component.InstanceID, ev *component.StatusEvent) {
actualStatuses[id] = append(actualStatuses[id], ev)
})
pg.telemetry.ReportComponentStatus = statusFunc
init()
pg.telemetry.Status = rep
rep.Ready()
e0, e1 := tc.edge[0], tc.edge[1]
pg.instanceIDs = map[int64]*component.InstanceID{

View File

@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/service/internal/status"
)
// NewNopTelemetrySettings returns a new nop settings for Create* functions.
@ -21,8 +22,6 @@ func NewNopTelemetrySettings() TelemetrySettings {
MeterProvider: noopmetric.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
ReportComponentStatus: func(*component.InstanceID, *component.StatusEvent) error {
return nil
},
Status: status.NewReporter(func(*component.InstanceID, *component.StatusEvent) {}),
}
}

View File

@ -18,7 +18,7 @@ import (
func TestNewNopSettings(t *testing.T) {
set := NewNopTelemetrySettings()
set.Status.Ready()
require.NotNil(t, set)
require.IsType(t, TelemetrySettings{}, set)
require.Equal(t, zap.NewNop(), set.Logger)
@ -26,5 +26,11 @@ func TestNewNopSettings(t *testing.T) {
require.Equal(t, noopmetric.NewMeterProvider(), set.MeterProvider)
require.Equal(t, configtelemetry.LevelNone, set.MetricsLevel)
require.Equal(t, pcommon.NewResource(), set.Resource)
require.NoError(t, set.ReportComponentStatus(&component.InstanceID{}, component.NewStatusEvent(component.StatusStarting)))
require.NoError(t,
set.Status.ReportComponentStatus(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
),
)
require.NoError(t, set.Status.ReportComponentOKIfStarting(&component.InstanceID{}))
}

View File

@ -4,14 +4,41 @@
package servicetelemetry // import "go.opentelemetry.io/collector/service/internal/servicetelemetry"
import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/service/internal/status"
)
// TelemetrySettings mirrors component.TelemetrySettings except for the method signature of
// ReportComponentStatus. The service level TelemetrySettings is not bound a specific component, and
// therefore takes a component.InstanceID as an argument.
type TelemetrySettings component.TelemetrySettingsBase[status.ServiceStatusFunc]
// TelemetrySettings mirrors component.TelemetrySettings except for the mechanism for reporting
// status. Service-level status reporting has additional methods which can report status for
// components by their InstanceID whereas the component versions are tied to a specific component.
type TelemetrySettings struct {
// Logger that the factory can use during creation and can pass to the created
// component to be used later as well.
Logger *zap.Logger
// TracerProvider that the factory can pass to other instrumented third-party libraries.
TracerProvider trace.TracerProvider
// MeterProvider that the factory can pass to other instrumented third-party libraries.
MeterProvider metric.MeterProvider
// MetricsLevel controls the level of detail for metrics emitted by the collector.
// Experimental: *NOTE* this field is experimental and may be changed or removed.
MetricsLevel configtelemetry.Level
// Resource contains the resource attributes for the collector's telemetry.
Resource pcommon.Resource
// Status contains a Reporter that allows the service to report status on behalf of a
// component.
Status *status.Reporter
}
// ToComponentTelemetrySettings returns a TelemetrySettings for a specific component derived from
// this service level Settings object.
@ -22,6 +49,6 @@ func (s TelemetrySettings) ToComponentTelemetrySettings(id *component.InstanceID
MeterProvider: s.MeterProvider,
MetricsLevel: s.MetricsLevel,
Resource: s.Resource,
ReportComponentStatus: status.NewComponentStatusFunc(id, s.ReportComponentStatus),
ReportComponentStatus: status.NewComponentStatusFunc(id, s.Status.ReportComponentStatus),
}
}

View File

@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/service/internal/status"
)
func TestSettings(t *testing.T) {
@ -23,12 +24,17 @@ func TestSettings(t *testing.T) {
MeterProvider: noopmetric.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
ReportComponentStatus: func(*component.InstanceID, *component.StatusEvent) error {
return nil
},
Status: status.NewReporter(func(*component.InstanceID, *component.StatusEvent) {}),
}
require.NoError(t, set.ReportComponentStatus(&component.InstanceID{}, component.NewStatusEvent(component.StatusOK)))
set.Status.Ready()
require.NoError(t,
set.Status.ReportComponentStatus(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
),
)
require.NoError(t, set.Status.ReportComponentOKIfStarting(&component.InstanceID{}))
compSet := set.ToComponentTelemetrySettings(&component.InstanceID{})
require.NoError(t, compSet.ReportComponentStatus(component.NewStatusEvent(component.StatusOK)))
require.NoError(t, compSet.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting)))
}

View File

@ -83,32 +83,6 @@ func newFSM(onTransition onTransitionFunc) *fsm {
}
}
// InitFunc can be used to toggle a ready flag to true
type InitFunc func()
// readFunc can be used to check the value of a ready flag
type readyFunc func() bool
// initAndReadyFuncs returns a pair of functions to set and check a boolean ready flag
func initAndReadyFuncs() (InitFunc, readyFunc) {
mu := sync.RWMutex{}
isReady := false
init := func() {
mu.Lock()
defer mu.Unlock()
isReady = true
}
ready := func() bool {
mu.RLock()
defer mu.RUnlock()
return isReady
}
return init, ready
}
// NotifyStatusFunc is the receiver of status events after successful state transitions
type NotifyStatusFunc func(*component.InstanceID, *component.StatusEvent)
@ -118,38 +92,74 @@ type ServiceStatusFunc func(*component.InstanceID, *component.StatusEvent) error
// errStatusNotReady is returned when trying to report status before service start
var errStatusNotReady = errors.New("report component status is not ready until service start")
// NewServiceStatusFunc returns a function to be used as ReportComponentStatus for
// servicetelemetry.Settings, which differs from component.TelemetrySettings in that
// the service version does not correspond to a specific component, and thus needs
// the a component.InstanceID as a parameter.
func NewServiceStatusFunc(notifyStatusChange NotifyStatusFunc) (InitFunc, ServiceStatusFunc) {
init, isReady := initAndReadyFuncs()
// mu synchronizes access to the fsmMap and the underlying fsm during a state transition
mu := sync.Mutex{}
fsmMap := make(map[*component.InstanceID]*fsm)
return init,
func(id *component.InstanceID, ev *component.StatusEvent) error {
if !isReady() {
return errStatusNotReady
}
mu.Lock()
defer mu.Unlock()
fsm, ok := fsmMap[id]
if !ok {
fsm = newFSM(func(ev *component.StatusEvent) {
notifyStatusChange(id, ev)
})
fsmMap[id] = fsm
}
return fsm.transition(ev)
}
// Reporter handles component status reporting
type Reporter struct {
mu sync.Mutex
ready bool
fsmMap map[*component.InstanceID]*fsm
onStatusChange NotifyStatusFunc
}
// NewReporter returns a reporter that will invoke the NotifyStatusFunc when a component's status
// has changed.
func NewReporter(onStatusChange NotifyStatusFunc) *Reporter {
return &Reporter{
fsmMap: make(map[*component.InstanceID]*fsm),
onStatusChange: onStatusChange,
}
}
// Ready enables status reporting
func (r *Reporter) Ready() {
r.mu.Lock()
defer r.mu.Unlock()
r.ready = true
}
// ReportComponentStatus reports status for the given InstanceID
func (r *Reporter) ReportComponentStatus(
id *component.InstanceID,
ev *component.StatusEvent,
) error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.ready {
return errStatusNotReady
}
return r.componentFSM(id).transition(ev)
}
// ReportComponentOkIfStarting reports StatusOK if the component's current status is Starting
func (r *Reporter) ReportComponentOKIfStarting(id *component.InstanceID) error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.ready {
return errStatusNotReady
}
fsm := r.componentFSM(id)
if fsm.current.Status() == component.StatusStarting {
return fsm.transition(component.NewStatusEvent(component.StatusOK))
}
return nil
}
// Note: a lock must be acquired before calling this method.
func (r *Reporter) componentFSM(id *component.InstanceID) *fsm {
fsm, ok := r.fsmMap[id]
if !ok {
fsm = newFSM(func(ev *component.StatusEvent) { r.onStatusChange(id, ev) })
r.fsmMap[id] = fsm
}
return fsm
}
// NewComponentStatusFunc returns a function to be used as ReportComponentStatus for
// component.TelemetrySettings, which differs from servicetelemetry.Settings in that
// the component version is tied to specific component instance.
func NewComponentStatusFunc(id *component.InstanceID, srvStatus ServiceStatusFunc) component.StatusFunc {
func NewComponentStatusFunc(
id *component.InstanceID,
srvStatus ServiceStatusFunc,
) component.StatusFunc {
return func(ev *component.StatusEvent) error {
return srvStatus(id, ev)
}

View File

@ -208,10 +208,10 @@ func TestStatusFuncs(t *testing.T) {
id2: statuses2,
}
init, serviceStatusFn := NewServiceStatusFunc(statusFunc)
comp1Func := NewComponentStatusFunc(id1, serviceStatusFn)
comp2Func := NewComponentStatusFunc(id2, serviceStatusFn)
init()
rep := NewReporter(statusFunc)
comp1Func := NewComponentStatusFunc(id1, rep.ReportComponentStatus)
comp2Func := NewComponentStatusFunc(id2, rep.ReportComponentStatus)
rep.Ready()
for _, st := range statuses1 {
require.NoError(t, comp1Func(component.NewStatusEvent(st)))
@ -230,8 +230,8 @@ func TestStatusFuncsConcurrent(t *testing.T) {
statusFunc := func(id *component.InstanceID, ev *component.StatusEvent) {
count++
}
init, serviceStatusFn := NewServiceStatusFunc(statusFunc)
init()
rep := NewReporter(statusFunc)
rep.Ready()
wg := sync.WaitGroup{}
wg.Add(len(ids))
@ -239,7 +239,7 @@ func TestStatusFuncsConcurrent(t *testing.T) {
for _, id := range ids {
id := id
go func() {
compFn := NewComponentStatusFunc(id, serviceStatusFn)
compFn := NewComponentStatusFunc(id, rep.ReportComponentStatus)
_ = compFn(component.NewStatusEvent(component.StatusStarting))
for i := 0; i < 1000; i++ {
_ = compFn(component.NewStatusEvent(component.StatusRecoverableError))
@ -253,16 +253,90 @@ func TestStatusFuncsConcurrent(t *testing.T) {
require.Equal(t, 8004, count)
}
func TestStatusFuncReady(t *testing.T) {
func TestReporterReady(t *testing.T) {
statusFunc := func(*component.InstanceID, *component.StatusEvent) {}
init, serviceStatusFn := NewServiceStatusFunc(statusFunc)
rep := NewReporter(statusFunc)
id := &component.InstanceID{}
err := serviceStatusFn(id, component.NewStatusEvent(component.StatusStarting))
err := rep.ReportComponentStatus(id, component.NewStatusEvent(component.StatusStarting))
require.ErrorIs(t, err, errStatusNotReady)
init()
rep.Ready()
err = serviceStatusFn(id, component.NewStatusEvent(component.StatusStarting))
err = rep.ReportComponentStatus(id, component.NewStatusEvent(component.StatusStarting))
require.NoError(t, err)
}
func TestReportComponentOKIfStarting(t *testing.T) {
for _, tc := range []struct {
name string
initialStatuses []component.Status
expectedStatuses []component.Status
}{
{
name: "matching condition: StatusStarting",
initialStatuses: []component.Status{
component.StatusStarting,
},
expectedStatuses: []component.Status{
component.StatusStarting,
component.StatusOK,
},
},
{
name: "non-matching condition StatusOK",
initialStatuses: []component.Status{
component.StatusStarting,
component.StatusOK,
},
expectedStatuses: []component.Status{
component.StatusStarting,
component.StatusOK,
},
},
{
name: "non-matching condition RecoverableError",
initialStatuses: []component.Status{
component.StatusStarting,
component.StatusRecoverableError,
},
expectedStatuses: []component.Status{
component.StatusStarting,
component.StatusRecoverableError,
},
},
{
name: "non-matching condition PermanentError",
initialStatuses: []component.Status{
component.StatusStarting,
component.StatusPermanentError,
},
expectedStatuses: []component.Status{
component.StatusStarting,
component.StatusPermanentError,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
var receivedStatuses []component.Status
rep := NewReporter(
func(_ *component.InstanceID, ev *component.StatusEvent) {
receivedStatuses = append(receivedStatuses, ev.Status())
},
)
rep.Ready()
id := &component.InstanceID{}
for _, status := range tc.initialStatuses {
err := rep.ReportComponentStatus(id, component.NewStatusEvent(status))
require.NoError(t, err)
}
err := rep.ReportComponentOKIfStarting(id)
require.NoError(t, err)
require.Equal(t, tc.expectedStatuses, receivedStatuses)
})
}
}

View File

@ -75,7 +75,6 @@ type Service struct {
host *serviceHost
telemetryInitializer *telemetryInitializer
collectorConf *confmap.Conf
statusInit status.InitFunc
}
func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
@ -112,9 +111,9 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
TracerProvider: srv.telemetry.TracerProvider(),
MeterProvider: noop.NewMeterProvider(),
MetricsLevel: cfg.Telemetry.Metrics.Level,
// Construct telemetry attributes from build info and config's resource attributes.
Resource: pcommonRes,
Status: status.NewReporter(srv.host.notifyComponentStatusChange),
}
if err = srv.telemetryInitializer.init(res, srv.telemetrySettings, cfg.Telemetry, set.AsyncErrorChannel); err != nil {
@ -122,8 +121,6 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
}
srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp
srv.telemetrySettings.TracerProvider = srv.telemetryInitializer.tp
srv.statusInit, srv.telemetrySettings.ReportComponentStatus =
status.NewServiceStatusFunc(srv.host.notifyComponentStatusChange)
// process the configuration and initialize the pipeline
if err = srv.initExtensionsAndPipeline(ctx, set, cfg); err != nil {
@ -151,7 +148,7 @@ func (srv *Service) Start(ctx context.Context) error {
)
// enable status reporting
srv.statusInit()
srv.telemetrySettings.Status.Ready()
if err := srv.host.serviceExtensions.Start(ctx, srv.host); err != nil {
return fmt.Errorf("failed to start extensions: %w", err)