249 lines
7.2 KiB
Go
249 lines
7.2 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package extensions // import "go.opentelemetry.io/collector/service/extensions"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"sort"
|
|
|
|
"go.uber.org/multierr"
|
|
"go.uber.org/zap"
|
|
|
|
"go.opentelemetry.io/collector/component"
|
|
"go.opentelemetry.io/collector/component/componentstatus"
|
|
"go.opentelemetry.io/collector/confmap"
|
|
"go.opentelemetry.io/collector/extension"
|
|
"go.opentelemetry.io/collector/extension/extensioncapabilities"
|
|
"go.opentelemetry.io/collector/internal/telemetry"
|
|
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
|
|
"go.opentelemetry.io/collector/service/internal/attribute"
|
|
"go.opentelemetry.io/collector/service/internal/builders"
|
|
"go.opentelemetry.io/collector/service/internal/status"
|
|
"go.opentelemetry.io/collector/service/internal/zpages"
|
|
)
|
|
|
|
const zExtensionName = "zextensionname"
|
|
|
|
// Extensions is a map of extensions created from extension configs.
|
|
type Extensions struct {
|
|
telemetry component.TelemetrySettings
|
|
extMap map[component.ID]extension.Extension
|
|
instanceIDs map[component.ID]*componentstatus.InstanceID
|
|
extensionIDs []component.ID // start order (and reverse stop order)
|
|
reporter status.Reporter
|
|
}
|
|
|
|
// Start starts all extensions.
|
|
func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
|
|
bes.telemetry.Logger.Info("Starting extensions...")
|
|
for _, extID := range bes.extensionIDs {
|
|
extLogger := componentattribute.ZapLoggerWithAttributes(bes.telemetry.Logger,
|
|
*attribute.Extension(extID).Set())
|
|
extLogger.Info("Extension is starting...")
|
|
instanceID := bes.instanceIDs[extID]
|
|
ext := bes.extMap[extID]
|
|
bes.reporter.ReportStatus(
|
|
instanceID,
|
|
componentstatus.NewEvent(componentstatus.StatusStarting),
|
|
)
|
|
if err := ext.Start(ctx, host); err != nil {
|
|
bes.reporter.ReportStatus(
|
|
instanceID,
|
|
componentstatus.NewPermanentErrorEvent(err),
|
|
)
|
|
// We log with zap.AddStacktrace(zap.DPanicLevel) to avoid adding the stack trace to the error log
|
|
extLogger.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).Error("Failed to start extension", zap.Error(err))
|
|
return err
|
|
}
|
|
bes.reporter.ReportOKIfStarting(instanceID)
|
|
extLogger.Info("Extension started.")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Shutdown stops all extensions.
|
|
func (bes *Extensions) Shutdown(ctx context.Context) error {
|
|
bes.telemetry.Logger.Info("Stopping extensions...")
|
|
var errs error
|
|
for i := len(bes.extensionIDs) - 1; i >= 0; i-- {
|
|
extID := bes.extensionIDs[i]
|
|
instanceID := bes.instanceIDs[extID]
|
|
ext := bes.extMap[extID]
|
|
bes.reporter.ReportStatus(
|
|
instanceID,
|
|
componentstatus.NewEvent(componentstatus.StatusStopping),
|
|
)
|
|
if err := ext.Shutdown(ctx); err != nil {
|
|
bes.reporter.ReportStatus(
|
|
instanceID,
|
|
componentstatus.NewPermanentErrorEvent(err),
|
|
)
|
|
errs = multierr.Append(errs, err)
|
|
continue
|
|
}
|
|
bes.reporter.ReportStatus(
|
|
instanceID,
|
|
componentstatus.NewEvent(componentstatus.StatusStopped),
|
|
)
|
|
}
|
|
|
|
return errs
|
|
}
|
|
|
|
func (bes *Extensions) NotifyPipelineReady() error {
|
|
for _, extID := range bes.extensionIDs {
|
|
ext := bes.extMap[extID]
|
|
if pw, ok := ext.(extensioncapabilities.PipelineWatcher); ok {
|
|
if err := pw.Ready(); err != nil {
|
|
return fmt.Errorf("failed to notify extension %q: %w", extID, err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (bes *Extensions) NotifyPipelineNotReady() error {
|
|
var errs error
|
|
for _, extID := range bes.extensionIDs {
|
|
ext := bes.extMap[extID]
|
|
if pw, ok := ext.(extensioncapabilities.PipelineWatcher); ok {
|
|
errs = multierr.Append(errs, pw.NotReady())
|
|
}
|
|
}
|
|
return errs
|
|
}
|
|
|
|
func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
|
|
var errs error
|
|
for _, extID := range bes.extensionIDs {
|
|
ext := bes.extMap[extID]
|
|
if cw, ok := ext.(extensioncapabilities.ConfigWatcher); ok {
|
|
clonedConf := confmap.NewFromStringMap(conf.ToStringMap())
|
|
errs = multierr.Append(errs, cw.NotifyConfig(ctx, clonedConf))
|
|
}
|
|
}
|
|
return errs
|
|
}
|
|
|
|
func (bes *Extensions) NotifyComponentStatusChange(source *componentstatus.InstanceID, event *componentstatus.Event) {
|
|
for _, extID := range bes.extensionIDs {
|
|
ext := bes.extMap[extID]
|
|
if sw, ok := ext.(componentstatus.Watcher); ok {
|
|
sw.ComponentStatusChanged(source, event)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (bes *Extensions) GetExtensions() map[component.ID]component.Component {
|
|
result := make(map[component.ID]component.Component, len(bes.extMap))
|
|
for extID, v := range bes.extMap {
|
|
result[extID] = v
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (bes *Extensions) HandleZPages(w http.ResponseWriter, r *http.Request) {
|
|
extensionName := r.URL.Query().Get(zExtensionName)
|
|
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Extensions"})
|
|
data := zpages.SummaryExtensionsTableData{}
|
|
|
|
data.Rows = make([]zpages.SummaryExtensionsTableRowData, 0, len(bes.extMap))
|
|
for _, id := range bes.extensionIDs {
|
|
row := zpages.SummaryExtensionsTableRowData{FullName: id.String()}
|
|
data.Rows = append(data.Rows, row)
|
|
}
|
|
|
|
sort.Slice(data.Rows, func(i, j int) bool {
|
|
return data.Rows[i].FullName < data.Rows[j].FullName
|
|
})
|
|
zpages.WriteHTMLExtensionsSummaryTable(w, data)
|
|
if extensionName != "" {
|
|
zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{
|
|
Name: extensionName,
|
|
})
|
|
// TODO: Add config + status info.
|
|
}
|
|
zpages.WriteHTMLPageFooter(w)
|
|
}
|
|
|
|
// Settings holds configuration for building Extensions.
|
|
type Settings struct {
|
|
Telemetry component.TelemetrySettings
|
|
BuildInfo component.BuildInfo
|
|
|
|
// Extensions builder for extensions.
|
|
Extensions builders.Extension
|
|
}
|
|
|
|
type Option interface {
|
|
apply(*Extensions)
|
|
}
|
|
|
|
type optionFunc func(*Extensions)
|
|
|
|
func (of optionFunc) apply(e *Extensions) {
|
|
of(e)
|
|
}
|
|
|
|
func WithReporter(reporter status.Reporter) Option {
|
|
return optionFunc(func(e *Extensions) {
|
|
e.reporter = reporter
|
|
})
|
|
}
|
|
|
|
// New creates a new Extensions from Config.
|
|
func New(ctx context.Context, set Settings, cfg Config, options ...Option) (*Extensions, error) {
|
|
exts := &Extensions{
|
|
telemetry: set.Telemetry,
|
|
extMap: make(map[component.ID]extension.Extension),
|
|
instanceIDs: make(map[component.ID]*componentstatus.InstanceID),
|
|
extensionIDs: make([]component.ID, 0, len(cfg)),
|
|
reporter: &nopReporter{},
|
|
}
|
|
|
|
for _, opt := range options {
|
|
opt.apply(exts)
|
|
}
|
|
|
|
for _, extID := range cfg {
|
|
instanceID := componentstatus.NewInstanceID(extID, component.KindExtension)
|
|
extSet := extension.Settings{
|
|
ID: extID,
|
|
TelemetrySettings: telemetry.WithAttributeSet(set.Telemetry, *attribute.Extension(extID).Set()),
|
|
BuildInfo: set.BuildInfo,
|
|
}
|
|
|
|
ext, err := set.Extensions.Create(ctx, extSet)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create extension %q: %w", extID, err)
|
|
}
|
|
|
|
// Check if the factory really created the extension.
|
|
if ext == nil {
|
|
return nil, fmt.Errorf("factory for %q produced a nil extension", extID)
|
|
}
|
|
|
|
exts.extMap[extID] = ext
|
|
exts.instanceIDs[extID] = instanceID
|
|
}
|
|
order, err := computeOrder(exts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
exts.extensionIDs = order
|
|
return exts, nil
|
|
}
|
|
|
|
type nopReporter struct{}
|
|
|
|
func (r *nopReporter) Ready() {}
|
|
|
|
func (r *nopReporter) ReportStatus(*componentstatus.InstanceID, *componentstatus.Event) {}
|
|
|
|
func (r *nopReporter) ReportOKIfStarting(*componentstatus.InstanceID) {}
|