opentelemetry-collector/service/extensions/extensions.go

249 lines
7.2 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package extensions // import "go.opentelemetry.io/collector/service/extensions"
import (
"context"
"fmt"
"net/http"
"sort"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/extensioncapabilities"
"go.opentelemetry.io/collector/internal/telemetry"
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
"go.opentelemetry.io/collector/service/internal/attribute"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/status"
"go.opentelemetry.io/collector/service/internal/zpages"
)
const zExtensionName = "zextensionname"
// Extensions is a map of extensions created from extension configs.
type Extensions struct {
telemetry component.TelemetrySettings
extMap map[component.ID]extension.Extension
instanceIDs map[component.ID]*componentstatus.InstanceID
extensionIDs []component.ID // start order (and reverse stop order)
reporter status.Reporter
}
// Start starts all extensions.
func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
bes.telemetry.Logger.Info("Starting extensions...")
for _, extID := range bes.extensionIDs {
extLogger := componentattribute.ZapLoggerWithAttributes(bes.telemetry.Logger,
*attribute.Extension(extID).Set())
extLogger.Info("Extension is starting...")
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
bes.reporter.ReportStatus(
instanceID,
componentstatus.NewEvent(componentstatus.StatusStarting),
)
if err := ext.Start(ctx, host); err != nil {
bes.reporter.ReportStatus(
instanceID,
componentstatus.NewPermanentErrorEvent(err),
)
// We log with zap.AddStacktrace(zap.DPanicLevel) to avoid adding the stack trace to the error log
extLogger.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).Error("Failed to start extension", zap.Error(err))
return err
}
bes.reporter.ReportOKIfStarting(instanceID)
extLogger.Info("Extension started.")
}
return nil
}
// Shutdown stops all extensions.
func (bes *Extensions) Shutdown(ctx context.Context) error {
bes.telemetry.Logger.Info("Stopping extensions...")
var errs error
for i := len(bes.extensionIDs) - 1; i >= 0; i-- {
extID := bes.extensionIDs[i]
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
bes.reporter.ReportStatus(
instanceID,
componentstatus.NewEvent(componentstatus.StatusStopping),
)
if err := ext.Shutdown(ctx); err != nil {
bes.reporter.ReportStatus(
instanceID,
componentstatus.NewPermanentErrorEvent(err),
)
errs = multierr.Append(errs, err)
continue
}
bes.reporter.ReportStatus(
instanceID,
componentstatus.NewEvent(componentstatus.StatusStopped),
)
}
return errs
}
func (bes *Extensions) NotifyPipelineReady() error {
for _, extID := range bes.extensionIDs {
ext := bes.extMap[extID]
if pw, ok := ext.(extensioncapabilities.PipelineWatcher); ok {
if err := pw.Ready(); err != nil {
return fmt.Errorf("failed to notify extension %q: %w", extID, err)
}
}
}
return nil
}
func (bes *Extensions) NotifyPipelineNotReady() error {
var errs error
for _, extID := range bes.extensionIDs {
ext := bes.extMap[extID]
if pw, ok := ext.(extensioncapabilities.PipelineWatcher); ok {
errs = multierr.Append(errs, pw.NotReady())
}
}
return errs
}
func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
var errs error
for _, extID := range bes.extensionIDs {
ext := bes.extMap[extID]
if cw, ok := ext.(extensioncapabilities.ConfigWatcher); ok {
clonedConf := confmap.NewFromStringMap(conf.ToStringMap())
errs = multierr.Append(errs, cw.NotifyConfig(ctx, clonedConf))
}
}
return errs
}
func (bes *Extensions) NotifyComponentStatusChange(source *componentstatus.InstanceID, event *componentstatus.Event) {
for _, extID := range bes.extensionIDs {
ext := bes.extMap[extID]
if sw, ok := ext.(componentstatus.Watcher); ok {
sw.ComponentStatusChanged(source, event)
}
}
}
func (bes *Extensions) GetExtensions() map[component.ID]component.Component {
result := make(map[component.ID]component.Component, len(bes.extMap))
for extID, v := range bes.extMap {
result[extID] = v
}
return result
}
func (bes *Extensions) HandleZPages(w http.ResponseWriter, r *http.Request) {
extensionName := r.URL.Query().Get(zExtensionName)
w.Header().Set("Content-Type", "text/html; charset=utf-8")
zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Extensions"})
data := zpages.SummaryExtensionsTableData{}
data.Rows = make([]zpages.SummaryExtensionsTableRowData, 0, len(bes.extMap))
for _, id := range bes.extensionIDs {
row := zpages.SummaryExtensionsTableRowData{FullName: id.String()}
data.Rows = append(data.Rows, row)
}
sort.Slice(data.Rows, func(i, j int) bool {
return data.Rows[i].FullName < data.Rows[j].FullName
})
zpages.WriteHTMLExtensionsSummaryTable(w, data)
if extensionName != "" {
zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{
Name: extensionName,
})
// TODO: Add config + status info.
}
zpages.WriteHTMLPageFooter(w)
}
// Settings holds configuration for building Extensions.
type Settings struct {
Telemetry component.TelemetrySettings
BuildInfo component.BuildInfo
// Extensions builder for extensions.
Extensions builders.Extension
}
type Option interface {
apply(*Extensions)
}
type optionFunc func(*Extensions)
func (of optionFunc) apply(e *Extensions) {
of(e)
}
func WithReporter(reporter status.Reporter) Option {
return optionFunc(func(e *Extensions) {
e.reporter = reporter
})
}
// New creates a new Extensions from Config.
func New(ctx context.Context, set Settings, cfg Config, options ...Option) (*Extensions, error) {
exts := &Extensions{
telemetry: set.Telemetry,
extMap: make(map[component.ID]extension.Extension),
instanceIDs: make(map[component.ID]*componentstatus.InstanceID),
extensionIDs: make([]component.ID, 0, len(cfg)),
reporter: &nopReporter{},
}
for _, opt := range options {
opt.apply(exts)
}
for _, extID := range cfg {
instanceID := componentstatus.NewInstanceID(extID, component.KindExtension)
extSet := extension.Settings{
ID: extID,
TelemetrySettings: telemetry.WithAttributeSet(set.Telemetry, *attribute.Extension(extID).Set()),
BuildInfo: set.BuildInfo,
}
ext, err := set.Extensions.Create(ctx, extSet)
if err != nil {
return nil, fmt.Errorf("failed to create extension %q: %w", extID, err)
}
// Check if the factory really created the extension.
if ext == nil {
return nil, fmt.Errorf("factory for %q produced a nil extension", extID)
}
exts.extMap[extID] = ext
exts.instanceIDs[extID] = instanceID
}
order, err := computeOrder(exts)
if err != nil {
return nil, err
}
exts.extensionIDs = order
return exts, nil
}
type nopReporter struct{}
func (r *nopReporter) Ready() {}
func (r *nopReporter) ReportStatus(*componentstatus.InstanceID, *componentstatus.Event) {}
func (r *nopReporter) ReportOKIfStarting(*componentstatus.InstanceID) {}