168 lines
4.8 KiB
Go
168 lines
4.8 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
// Package sharedcomponent exposes functionality for components
|
|
// to register against a shared key, such as a configuration object, in order to be reused across signal types.
|
|
// This is particularly useful when the component relies on a shared resource such as os.File or http.Server.
|
|
package sharedcomponent // import "go.opentelemetry.io/collector/internal/sharedcomponent"
|
|
|
|
import (
|
|
"container/ring"
|
|
"context"
|
|
"sync"
|
|
|
|
"go.opentelemetry.io/collector/component"
|
|
"go.opentelemetry.io/collector/component/componentstatus"
|
|
)
|
|
|
|
func NewMap[K comparable, V component.Component]() *Map[K, V] {
|
|
return &Map[K, V]{
|
|
components: map[K]*Component[V]{},
|
|
}
|
|
}
|
|
|
|
// Map keeps reference of all created instances for a given shared key such as a component configuration.
|
|
type Map[K comparable, V component.Component] struct {
|
|
lock sync.Mutex
|
|
components map[K]*Component[V]
|
|
}
|
|
|
|
// LoadOrStore returns the already created instance if exists, otherwise creates a new instance
|
|
// and adds it to the map of references.
|
|
func (m *Map[K, V]) LoadOrStore(key K, create func() (V, error)) (*Component[V], error) {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
if c, ok := m.components[key]; ok {
|
|
return c, nil
|
|
}
|
|
comp, err := create()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
newComp := &Component[V]{
|
|
component: comp,
|
|
removeFunc: func() {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
delete(m.components, key)
|
|
},
|
|
}
|
|
m.components[key] = newComp
|
|
return newComp, nil
|
|
}
|
|
|
|
// Component ensures that the wrapped component is started and stopped only once.
|
|
// When stopped it is removed from the Map.
|
|
type Component[V component.Component] struct {
|
|
component V
|
|
|
|
startOnce sync.Once
|
|
stopOnce sync.Once
|
|
removeFunc func()
|
|
|
|
hostWrapper *hostWrapper
|
|
}
|
|
|
|
// Unwrap returns the original component.
|
|
func (c *Component[V]) Unwrap() V {
|
|
return c.component
|
|
}
|
|
|
|
// Start starts the underlying component if it never started before.
|
|
func (c *Component[V]) Start(ctx context.Context, host component.Host) error {
|
|
if c.hostWrapper == nil {
|
|
var err error
|
|
c.startOnce.Do(func() {
|
|
c.hostWrapper = &hostWrapper{
|
|
host: host,
|
|
sources: make([]componentstatus.Reporter, 0),
|
|
previousEvents: ring.New(5),
|
|
}
|
|
statusReporter, isStatusReporter := host.(componentstatus.Reporter)
|
|
if isStatusReporter {
|
|
c.hostWrapper.addSource(statusReporter)
|
|
}
|
|
|
|
// It's important that status for a shared component is reported through its
|
|
// telemetry settings to keep status in sync and avoid race conditions. This logic duplicates
|
|
// and takes priority over the automated status reporting that happens in graph, making the
|
|
// status reporting in graph a no-op.
|
|
c.hostWrapper.Report(componentstatus.NewEvent(componentstatus.StatusStarting))
|
|
if err = c.component.Start(ctx, c.hostWrapper); err != nil {
|
|
c.hostWrapper.Report(componentstatus.NewPermanentErrorEvent(err))
|
|
}
|
|
})
|
|
return err
|
|
}
|
|
statusReporter, isStatusReporter := host.(componentstatus.Reporter)
|
|
if isStatusReporter {
|
|
c.hostWrapper.addSource(statusReporter)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
_ component.Host = (*hostWrapper)(nil)
|
|
_ componentstatus.Reporter = (*hostWrapper)(nil)
|
|
)
|
|
|
|
type hostWrapper struct {
|
|
host component.Host
|
|
sources []componentstatus.Reporter
|
|
previousEvents *ring.Ring
|
|
lock sync.Mutex
|
|
}
|
|
|
|
func (h *hostWrapper) GetExtensions() map[component.ID]component.Component {
|
|
return h.host.GetExtensions()
|
|
}
|
|
|
|
func (h *hostWrapper) Report(e *componentstatus.Event) {
|
|
// Only remember an event if it will be emitted and it has not been sent already.
|
|
h.lock.Lock()
|
|
defer h.lock.Unlock()
|
|
if len(h.sources) > 0 {
|
|
h.previousEvents.Value = e
|
|
h.previousEvents = h.previousEvents.Next()
|
|
}
|
|
for _, s := range h.sources {
|
|
s.Report(e)
|
|
}
|
|
}
|
|
|
|
func (h *hostWrapper) addSource(s componentstatus.Reporter) {
|
|
h.lock.Lock()
|
|
defer h.lock.Unlock()
|
|
h.previousEvents.Do(func(a any) {
|
|
if e, ok := a.(*componentstatus.Event); ok {
|
|
s.Report(e)
|
|
}
|
|
})
|
|
h.sources = append(h.sources, s)
|
|
}
|
|
|
|
// Shutdown shuts down the underlying component.
|
|
func (c *Component[V]) Shutdown(ctx context.Context) error {
|
|
var err error
|
|
c.stopOnce.Do(func() {
|
|
// It's important that status for a shared component is reported through its
|
|
// telemetry settings to keep status in sync and avoid race conditions. This logic duplicates
|
|
// and takes priority over the automated status reporting that happens in graph, making the
|
|
// status reporting in graph a no-op.
|
|
if c.hostWrapper != nil {
|
|
c.hostWrapper.Report(componentstatus.NewEvent(componentstatus.StatusStopping))
|
|
}
|
|
err = c.component.Shutdown(ctx)
|
|
if c.hostWrapper != nil {
|
|
if err != nil {
|
|
c.hostWrapper.Report(componentstatus.NewPermanentErrorEvent(err))
|
|
} else {
|
|
c.hostWrapper.Report(componentstatus.NewEvent(componentstatus.StatusStopped))
|
|
}
|
|
}
|
|
c.removeFunc()
|
|
})
|
|
return err
|
|
}
|