Minimum changes to component.Host, to allow split of the component package (#6553)
* Draft minimum changes to component.Host Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com> * Update .chloggen/draftminhost.yaml Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com> Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com> Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
This commit is contained in:
parent
4565692c50
commit
aaaa356a38
|
|
@ -0,0 +1,11 @@
|
|||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
|
||||
change_type: deprecation
|
||||
|
||||
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
|
||||
component: component
|
||||
|
||||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
|
||||
note: Deprecate `component.Receiver`, `component.Processor`, and `component.Exporter`.
|
||||
|
||||
# One or more tracking issues or pull requests related to the change
|
||||
issues: [6553]
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
|
||||
change_type: enhancement
|
||||
|
||||
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
|
||||
component: component
|
||||
|
||||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
|
||||
note: "`component.Extension` is temporarily set to be an alias of `component.Component` which will be reverted once it's moved to the `extension` package. Change your `component.Host.GetExtensions()` implementation to return `map[ID]component.Component` instead of `map[ID]component.Extension`"
|
||||
|
||||
# One or more tracking issues or pull requests related to the change
|
||||
issues: [6553]
|
||||
|
|
@ -32,10 +32,10 @@ func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Fact
|
|||
return nil
|
||||
}
|
||||
|
||||
func (nh *nopHost) GetExtensions() map[component.ID]component.Extension {
|
||||
func (nh *nopHost) GetExtensions() map[component.ID]component.Component {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nh *nopHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter {
|
||||
func (nh *nopHost) GetExporters() map[component.DataType]map[component.ID]component.Component {
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,26 +34,24 @@ func UnmarshalExporterConfig(conf *confmap.Conf, cfg ExporterConfig) error {
|
|||
return unmarshal(conf, cfg)
|
||||
}
|
||||
|
||||
// Exporter exports telemetry data from the collector to a destination.
|
||||
type Exporter interface {
|
||||
Component
|
||||
}
|
||||
// Deprecated: [v0.65.0] unnecessary interface, will be removed.
|
||||
type Exporter = Component
|
||||
|
||||
// TracesExporter is an Exporter that can consume traces.
|
||||
type TracesExporter interface {
|
||||
Exporter
|
||||
Component
|
||||
consumer.Traces
|
||||
}
|
||||
|
||||
// MetricsExporter is an Exporter that can consume metrics.
|
||||
type MetricsExporter interface {
|
||||
Exporter
|
||||
Component
|
||||
consumer.Metrics
|
||||
}
|
||||
|
||||
// LogsExporter is an Exporter that can consume logs.
|
||||
type LogsExporter interface {
|
||||
Exporter
|
||||
Component
|
||||
consumer.Logs
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -36,9 +36,7 @@ func UnmarshalExtensionConfig(conf *confmap.Conf, cfg ExtensionConfig) error {
|
|||
// Extension is the interface for objects hosted by the OpenTelemetry Collector that
|
||||
// don't participate directly on data pipelines but provide some functionality
|
||||
// to the service, examples: health check endpoint, z-pages, etc.
|
||||
type Extension interface {
|
||||
Component
|
||||
}
|
||||
type Extension = Component
|
||||
|
||||
// PipelineWatcher is an extra interface for Extension hosted by the OpenTelemetry
|
||||
// Collector that is to be implemented by extensions interested in changes to pipeline
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ type Host interface {
|
|||
//
|
||||
// GetExtensions can be called by the component anytime after Component.Start() begins and
|
||||
// until Component.Shutdown() ends.
|
||||
GetExtensions() map[ID]Extension
|
||||
GetExtensions() map[ID]Component
|
||||
|
||||
// GetExporters returns the map of exporters. Only enabled and created exporters will be returned.
|
||||
// Typically is used to find exporters by type or by full config name. Both cases
|
||||
|
|
@ -58,5 +58,5 @@ type Host interface {
|
|||
//
|
||||
// GetExporters can be called by the component anytime after Component.Start() begins and
|
||||
// until Component.Shutdown() ends.
|
||||
GetExporters() map[DataType]map[ID]Exporter
|
||||
GetExporters() map[DataType]map[ID]Component
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,27 +34,24 @@ func UnmarshalProcessorConfig(conf *confmap.Conf, cfg ProcessorConfig) error {
|
|||
return unmarshal(conf, cfg)
|
||||
}
|
||||
|
||||
// Processor defines the common functions that must be implemented by TracesProcessor
|
||||
// and MetricsProcessor.
|
||||
type Processor interface {
|
||||
Component
|
||||
}
|
||||
// Deprecated: [v0.65.0] unnecessary interface, will be removed.
|
||||
type Processor = Component
|
||||
|
||||
// TracesProcessor is a processor that can consume traces.
|
||||
type TracesProcessor interface {
|
||||
Processor
|
||||
Component
|
||||
consumer.Traces
|
||||
}
|
||||
|
||||
// MetricsProcessor is a processor that can consume metrics.
|
||||
type MetricsProcessor interface {
|
||||
Processor
|
||||
Component
|
||||
consumer.Metrics
|
||||
}
|
||||
|
||||
// LogsProcessor is a processor that can consume logs.
|
||||
type LogsProcessor interface {
|
||||
Processor
|
||||
Component
|
||||
consumer.Logs
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -72,9 +72,8 @@ func UnmarshalReceiverConfig(conf *confmap.Conf, cfg ReceiverConfig) error {
|
|||
//
|
||||
// This ensures there are strong delivery guarantees once the data is acknowledged
|
||||
// by the Collector.
|
||||
type Receiver interface {
|
||||
Component
|
||||
}
|
||||
// Deprecated: [v0.65.0] unnecessary interface, will be removed.
|
||||
type Receiver = Component
|
||||
|
||||
// A TracesReceiver receives traces.
|
||||
// Its purpose is to translate data from any format to the collector's internal trace format.
|
||||
|
|
@ -82,7 +81,7 @@ type Receiver interface {
|
|||
//
|
||||
// For example it could be Zipkin data source which translates Zipkin spans into ptrace.Traces.
|
||||
type TracesReceiver interface {
|
||||
Receiver
|
||||
Component
|
||||
}
|
||||
|
||||
// A MetricsReceiver receives metrics.
|
||||
|
|
@ -91,7 +90,7 @@ type TracesReceiver interface {
|
|||
//
|
||||
// For example it could be Prometheus data source which translates Prometheus metrics into pmetric.Metrics.
|
||||
type MetricsReceiver interface {
|
||||
Receiver
|
||||
Component
|
||||
}
|
||||
|
||||
// A LogsReceiver receives logs.
|
||||
|
|
@ -100,7 +99,7 @@ type MetricsReceiver interface {
|
|||
//
|
||||
// For example a LogsReceiver can read syslogs and convert them into plog.Logs.
|
||||
type LogsReceiver interface {
|
||||
Receiver
|
||||
Component
|
||||
}
|
||||
|
||||
// ReceiverCreateSettings configures Receiver creators.
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ type Authentication struct {
|
|||
|
||||
// GetServerAuthenticator attempts to select the appropriate ServerAuthenticator from the list of extensions,
|
||||
// based on the requested extension name. If an authenticator is not found, an error is returned.
|
||||
func (a Authentication) GetServerAuthenticator(extensions map[component.ID]component.Extension) (ServerAuthenticator, error) {
|
||||
func (a Authentication) GetServerAuthenticator(extensions map[component.ID]component.Component) (ServerAuthenticator, error) {
|
||||
if ext, found := extensions[a.AuthenticatorID]; found {
|
||||
if auth, ok := ext.(ServerAuthenticator); ok {
|
||||
return auth, nil
|
||||
|
|
@ -49,7 +49,7 @@ func (a Authentication) GetServerAuthenticator(extensions map[component.ID]compo
|
|||
// GetClientAuthenticator attempts to select the appropriate ClientAuthenticator from the list of extensions,
|
||||
// based on the component id of the extension. If an authenticator is not found, an error is returned.
|
||||
// This should be only used by HTTP clients.
|
||||
func (a Authentication) GetClientAuthenticator(extensions map[component.ID]component.Extension) (ClientAuthenticator, error) {
|
||||
func (a Authentication) GetClientAuthenticator(extensions map[component.ID]component.Component) (ClientAuthenticator, error) {
|
||||
if ext, found := extensions[a.AuthenticatorID]; found {
|
||||
if auth, ok := ext.(ClientAuthenticator); ok {
|
||||
return auth, nil
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ func TestGetServerAuthenticator(t *testing.T) {
|
|||
cfg := &Authentication{
|
||||
AuthenticatorID: component.NewID("mock"),
|
||||
}
|
||||
ext := map[component.ID]component.Extension{
|
||||
ext := map[component.ID]component.Component{
|
||||
component.NewID("mock"): tC.authenticator,
|
||||
}
|
||||
|
||||
|
|
@ -68,7 +68,7 @@ func TestGetServerAuthenticatorFails(t *testing.T) {
|
|||
AuthenticatorID: component.NewID("does-not-exist"),
|
||||
}
|
||||
|
||||
authenticator, err := cfg.GetServerAuthenticator(map[component.ID]component.Extension{})
|
||||
authenticator, err := cfg.GetServerAuthenticator(map[component.ID]component.Component{})
|
||||
assert.ErrorIs(t, err, errAuthenticatorNotFound)
|
||||
assert.Nil(t, authenticator)
|
||||
}
|
||||
|
|
@ -96,7 +96,7 @@ func TestGetClientAuthenticator(t *testing.T) {
|
|||
cfg := &Authentication{
|
||||
AuthenticatorID: component.NewID("mock"),
|
||||
}
|
||||
ext := map[component.ID]component.Extension{
|
||||
ext := map[component.ID]component.Component{
|
||||
component.NewID("mock"): tC.authenticator,
|
||||
}
|
||||
|
||||
|
|
@ -118,7 +118,7 @@ func TestGetClientAuthenticatorFails(t *testing.T) {
|
|||
cfg := &Authentication{
|
||||
AuthenticatorID: component.NewID("does-not-exist"),
|
||||
}
|
||||
authenticator, err := cfg.GetClientAuthenticator(map[component.ID]component.Extension{})
|
||||
authenticator, err := cfg.GetClientAuthenticator(map[component.ID]component.Component{})
|
||||
assert.ErrorIs(t, err, errAuthenticatorNotFound)
|
||||
assert.Nil(t, authenticator)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ func TestAllGrpcClientSettings(t *testing.T) {
|
|||
Auth: &configauth.Authentication{AuthenticatorID: component.NewID("testauth")},
|
||||
},
|
||||
host: &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("testauth"): &configauth.MockClientAuthenticator{},
|
||||
},
|
||||
},
|
||||
|
|
@ -119,7 +119,7 @@ func TestAllGrpcClientSettings(t *testing.T) {
|
|||
Auth: &configauth.Authentication{AuthenticatorID: component.NewID("testauth")},
|
||||
},
|
||||
host: &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("testauth"): &configauth.MockClientAuthenticator{},
|
||||
},
|
||||
},
|
||||
|
|
@ -147,7 +147,7 @@ func TestAllGrpcClientSettings(t *testing.T) {
|
|||
Auth: &configauth.Authentication{AuthenticatorID: component.NewID("testauth")},
|
||||
},
|
||||
host: &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("testauth"): &configauth.MockClientAuthenticator{},
|
||||
},
|
||||
},
|
||||
|
|
@ -216,7 +216,7 @@ func TestGrpcServerAuthSettings(t *testing.T) {
|
|||
AuthenticatorID: component.NewID("mock"),
|
||||
}
|
||||
host := &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("mock"): configauth.NewServerAuthenticator(),
|
||||
},
|
||||
}
|
||||
|
|
@ -295,7 +295,7 @@ func TestGRPCClientSettingsError(t *testing.T) {
|
|||
Endpoint: "localhost:1234",
|
||||
Auth: &configauth.Authentication{AuthenticatorID: component.NewID("doesntexist")},
|
||||
},
|
||||
host: &mockHost{ext: map[component.ID]component.Extension{}},
|
||||
host: &mockHost{ext: map[component.ID]component.Component{}},
|
||||
},
|
||||
{
|
||||
err: "no extensions configuration available",
|
||||
|
|
@ -1075,9 +1075,9 @@ func tempSocketName(t *testing.T) string {
|
|||
|
||||
type mockHost struct {
|
||||
component.Host
|
||||
ext map[component.ID]component.Extension
|
||||
ext map[component.ID]component.Component
|
||||
}
|
||||
|
||||
func (nh *mockHost) GetExtensions() map[component.ID]component.Extension {
|
||||
func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
|
||||
return nh.ext
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ func (c *customRoundTripper) RoundTrip(request *http.Request) (*http.Response, e
|
|||
|
||||
func TestAllHTTPClientSettings(t *testing.T) {
|
||||
host := &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("testauth"): &configauth.MockClientAuthenticator{ResultRoundTripper: &customRoundTripper{}},
|
||||
},
|
||||
}
|
||||
|
|
@ -160,7 +160,7 @@ func TestAllHTTPClientSettings(t *testing.T) {
|
|||
|
||||
func TestPartialHTTPClientSettings(t *testing.T) {
|
||||
host := &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("testauth"): &configauth.MockClientAuthenticator{ResultRoundTripper: &customRoundTripper{}},
|
||||
},
|
||||
}
|
||||
|
|
@ -211,7 +211,7 @@ func TestDefaultHTTPClientSettings(t *testing.T) {
|
|||
|
||||
func TestHTTPClientSettingsError(t *testing.T) {
|
||||
host := &mockHost{
|
||||
ext: map[component.ID]component.Extension{},
|
||||
ext: map[component.ID]component.Component{},
|
||||
}
|
||||
tests := []struct {
|
||||
settings HTTPClientSettings
|
||||
|
|
@ -274,7 +274,7 @@ func TestHTTPClientSettingWithAuthConfig(t *testing.T) {
|
|||
},
|
||||
shouldErr: false,
|
||||
host: &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("mock"): &configauth.MockClientAuthenticator{
|
||||
ResultRoundTripper: &customRoundTripper{},
|
||||
},
|
||||
|
|
@ -289,7 +289,7 @@ func TestHTTPClientSettingWithAuthConfig(t *testing.T) {
|
|||
},
|
||||
shouldErr: true,
|
||||
host: &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("mock"): &configauth.MockClientAuthenticator{ResultRoundTripper: &customRoundTripper{}},
|
||||
},
|
||||
},
|
||||
|
|
@ -311,7 +311,7 @@ func TestHTTPClientSettingWithAuthConfig(t *testing.T) {
|
|||
},
|
||||
shouldErr: false,
|
||||
host: &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("mock"): &configauth.MockClientAuthenticator{ResultRoundTripper: &customRoundTripper{}},
|
||||
},
|
||||
},
|
||||
|
|
@ -324,7 +324,7 @@ func TestHTTPClientSettingWithAuthConfig(t *testing.T) {
|
|||
},
|
||||
shouldErr: true,
|
||||
host: &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("mock"): &configauth.MockClientAuthenticator{
|
||||
ResultRoundTripper: &customRoundTripper{}, MustError: true},
|
||||
},
|
||||
|
|
@ -737,7 +737,7 @@ func TestHttpCorsWithAuthentication(t *testing.T) {
|
|||
}
|
||||
|
||||
host := &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("mock"): configauth.NewServerAuthenticator(
|
||||
configauth.WithAuthenticate(func(ctx context.Context, headers map[string][]string) (context.Context, error) {
|
||||
return ctx, errors.New("authentication failed")
|
||||
|
|
@ -932,7 +932,7 @@ func TestServerAuth(t *testing.T) {
|
|||
}
|
||||
|
||||
host := &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("mock"): configauth.NewServerAuthenticator(
|
||||
configauth.WithAuthenticate(func(ctx context.Context, headers map[string][]string) (context.Context, error) {
|
||||
authCalled = true
|
||||
|
|
@ -979,7 +979,7 @@ func TestFailedServerAuth(t *testing.T) {
|
|||
},
|
||||
}
|
||||
host := &mockHost{
|
||||
ext: map[component.ID]component.Extension{
|
||||
ext: map[component.ID]component.Component{
|
||||
component.NewID("mock"): configauth.NewServerAuthenticator(
|
||||
configauth.WithAuthenticate(func(ctx context.Context, headers map[string][]string) (context.Context, error) {
|
||||
return ctx, errors.New("authentication failed")
|
||||
|
|
@ -1002,10 +1002,10 @@ func TestFailedServerAuth(t *testing.T) {
|
|||
|
||||
type mockHost struct {
|
||||
component.Host
|
||||
ext map[component.ID]component.Extension
|
||||
ext map[component.ID]component.Component
|
||||
}
|
||||
|
||||
func (nh *mockHost) GetExtensions() map[component.ID]component.Extension {
|
||||
func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
|
||||
return nh.ext
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -127,7 +127,7 @@ func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg Queue
|
|||
return qrs
|
||||
}
|
||||
|
||||
func getStorageExtension(extensions map[component.ID]component.Extension, storageID component.ID) (storage.Extension, error) {
|
||||
func getStorageExtension(extensions map[component.ID]component.Component, storageID component.ID) (storage.Extension, error) {
|
||||
if ext, found := extensions[storageID]; found {
|
||||
if storageExt, ok := ext.(storage.Extension); ok {
|
||||
return storageExt, nil
|
||||
|
|
|
|||
|
|
@ -435,7 +435,7 @@ func TestGetRetrySettings(t *testing.T) {
|
|||
t.Run(tC.desc, func(t *testing.T) {
|
||||
storageID := component.NewIDWithName("file_storage", strconv.Itoa(tC.storageIndex))
|
||||
|
||||
var extensions = map[component.ID]component.Extension{}
|
||||
var extensions = map[component.ID]component.Component{}
|
||||
for i := 0; i < tC.numStorages; i++ {
|
||||
extensions[component.NewIDWithName("file_storage", strconv.Itoa(i))] = &mockStorageExtension{GetClientError: tC.getClientError}
|
||||
}
|
||||
|
|
@ -466,7 +466,7 @@ func TestInvalidStorageExtensionType(t *testing.T) {
|
|||
settings := componenttest.NewNopExtensionCreateSettings()
|
||||
extension, err := factory.CreateExtension(context.Background(), settings, extConfig)
|
||||
assert.NoError(t, err)
|
||||
var extensions = map[component.ID]component.Extension{
|
||||
var extensions = map[component.ID]component.Component{
|
||||
storageID: extension,
|
||||
}
|
||||
host := &mockHost{ext: extensions}
|
||||
|
|
@ -545,7 +545,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) {
|
|||
be, err := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
|
||||
require.NoError(t, err)
|
||||
|
||||
var extensions = map[component.ID]component.Extension{
|
||||
var extensions = map[component.ID]component.Component{
|
||||
storageID: &mockStorageExtension{},
|
||||
}
|
||||
host := &mockHost{ext: extensions}
|
||||
|
|
@ -568,7 +568,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
|
|||
be, err := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
|
||||
require.NoError(t, err)
|
||||
|
||||
var extensions = map[component.ID]component.Extension{
|
||||
var extensions = map[component.ID]component.Component{
|
||||
storageID: &mockStorageExtension{GetClientError: storageError},
|
||||
}
|
||||
host := &mockHost{ext: extensions}
|
||||
|
|
@ -746,10 +746,10 @@ func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []met
|
|||
|
||||
type mockHost struct {
|
||||
component.Host
|
||||
ext map[component.ID]component.Extension
|
||||
ext map[component.ID]component.Component
|
||||
}
|
||||
|
||||
func (nh *mockHost) GetExtensions() map[component.ID]component.Extension {
|
||||
func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
|
||||
return nh.ext
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -430,8 +430,8 @@ type host struct {
|
|||
component.Host
|
||||
}
|
||||
|
||||
func (h *host) GetExtensions() map[component.ID]component.Extension {
|
||||
ret := make(map[component.ID]component.Extension)
|
||||
func (h *host) GetExtensions() map[component.ID]component.Component {
|
||||
ret := make(map[component.ID]component.Component)
|
||||
ret[component.NewID("ballast")] = &ballastExtension{ballastSize: h.ballastSize}
|
||||
return ret
|
||||
}
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ func NewScraperControllerReceiver(
|
|||
set component.ReceiverCreateSettings,
|
||||
nextConsumer consumer.Metrics,
|
||||
options ...ScraperControllerOption,
|
||||
) (component.Receiver, error) {
|
||||
) (component.Component, error) {
|
||||
if nextConsumer == nil {
|
||||
return nil, component.ErrNilNextConsumer
|
||||
}
|
||||
|
|
|
|||
|
|
@ -83,8 +83,8 @@ func (bes *Extensions) NotifyPipelineNotReady() error {
|
|||
return errs
|
||||
}
|
||||
|
||||
func (bes *Extensions) GetExtensions() map[component.ID]component.Extension {
|
||||
result := make(map[component.ID]component.Extension, len(bes.extMap))
|
||||
func (bes *Extensions) GetExtensions() map[component.ID]component.Component {
|
||||
result := make(map[component.ID]component.Component, len(bes.extMap))
|
||||
for extID, v := range bes.extMap {
|
||||
result[extID] = v
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,10 +52,10 @@ func (host *serviceHost) GetFactory(kind component.Kind, componentType component
|
|||
return nil
|
||||
}
|
||||
|
||||
func (host *serviceHost) GetExtensions() map[component.ID]component.Extension {
|
||||
func (host *serviceHost) GetExtensions() map[component.ID]component.Component {
|
||||
return host.extensions.GetExtensions()
|
||||
}
|
||||
|
||||
func (host *serviceHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter {
|
||||
func (host *serviceHost) GetExporters() map[component.DataType]map[component.ID]component.Component {
|
||||
return host.pipelines.GetExporters()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,8 +59,8 @@ type builtPipeline struct {
|
|||
type Pipelines struct {
|
||||
telemetry component.TelemetrySettings
|
||||
|
||||
allReceivers map[component.DataType]map[component.ID]component.Receiver
|
||||
allExporters map[component.DataType]map[component.ID]component.Exporter
|
||||
allReceivers map[component.DataType]map[component.ID]component.Component
|
||||
allExporters map[component.DataType]map[component.ID]component.Component
|
||||
|
||||
pipelines map[component.ID]*builtPipeline
|
||||
}
|
||||
|
|
@ -139,12 +139,12 @@ func (bps *Pipelines) ShutdownAll(ctx context.Context) error {
|
|||
return errs
|
||||
}
|
||||
|
||||
func (bps *Pipelines) GetExporters() map[component.DataType]map[component.ID]component.Exporter {
|
||||
exportersMap := make(map[component.DataType]map[component.ID]component.Exporter)
|
||||
func (bps *Pipelines) GetExporters() map[component.DataType]map[component.ID]component.Component {
|
||||
exportersMap := make(map[component.DataType]map[component.ID]component.Component)
|
||||
|
||||
exportersMap[component.DataTypeTraces] = make(map[component.ID]component.Exporter, len(bps.allExporters[component.DataTypeTraces]))
|
||||
exportersMap[component.DataTypeMetrics] = make(map[component.ID]component.Exporter, len(bps.allExporters[component.DataTypeMetrics]))
|
||||
exportersMap[component.DataTypeLogs] = make(map[component.ID]component.Exporter, len(bps.allExporters[component.DataTypeLogs]))
|
||||
exportersMap[component.DataTypeTraces] = make(map[component.ID]component.Component, len(bps.allExporters[component.DataTypeTraces]))
|
||||
exportersMap[component.DataTypeMetrics] = make(map[component.ID]component.Component, len(bps.allExporters[component.DataTypeMetrics]))
|
||||
exportersMap[component.DataTypeLogs] = make(map[component.ID]component.Component, len(bps.allExporters[component.DataTypeLogs]))
|
||||
|
||||
for dt, expByID := range bps.allExporters {
|
||||
for expID, exp := range expByID {
|
||||
|
|
@ -208,8 +208,8 @@ type Settings struct {
|
|||
func Build(ctx context.Context, set Settings) (*Pipelines, error) {
|
||||
exps := &Pipelines{
|
||||
telemetry: set.Telemetry,
|
||||
allReceivers: make(map[component.DataType]map[component.ID]component.Receiver),
|
||||
allExporters: make(map[component.DataType]map[component.ID]component.Exporter),
|
||||
allReceivers: make(map[component.DataType]map[component.ID]component.Component),
|
||||
allExporters: make(map[component.DataType]map[component.ID]component.Component),
|
||||
pipelines: make(map[component.ID]*builtPipeline, len(set.PipelineConfigs)),
|
||||
}
|
||||
|
||||
|
|
@ -221,7 +221,7 @@ func Build(ctx context.Context, set Settings) (*Pipelines, error) {
|
|||
for pipelineID, pipeline := range set.PipelineConfigs {
|
||||
// The data type of the pipeline defines what data type each exporter is expected to receive.
|
||||
if _, ok := exps.allExporters[pipelineID.Type()]; !ok {
|
||||
exps.allExporters[pipelineID.Type()] = make(map[component.ID]component.Exporter)
|
||||
exps.allExporters[pipelineID.Type()] = make(map[component.ID]component.Component)
|
||||
}
|
||||
expByID := exps.allExporters[pipelineID.Type()]
|
||||
|
||||
|
|
@ -306,7 +306,7 @@ func Build(ctx context.Context, set Settings) (*Pipelines, error) {
|
|||
for pipelineID, pipeline := range set.PipelineConfigs {
|
||||
// The data type of the pipeline defines what data type each exporter is expected to receive.
|
||||
if _, ok := exps.allReceivers[pipelineID.Type()]; !ok {
|
||||
exps.allReceivers[pipelineID.Type()] = make(map[component.ID]component.Receiver)
|
||||
exps.allReceivers[pipelineID.Type()] = make(map[component.ID]component.Component)
|
||||
}
|
||||
recvByID := exps.allReceivers[pipelineID.Type()]
|
||||
bp := exps.pipelines[pipelineID]
|
||||
|
|
@ -339,7 +339,7 @@ func buildExporter(
|
|||
factories map[component.Type]component.ExporterFactory,
|
||||
id component.ID,
|
||||
pipelineID component.ID,
|
||||
) (component.Exporter, error) {
|
||||
) (component.Component, error) {
|
||||
cfg, existsCfg := cfgs[id]
|
||||
if !existsCfg {
|
||||
return nil, fmt.Errorf("exporter %q is not configured", id)
|
||||
|
|
@ -365,7 +365,7 @@ func buildExporter(
|
|||
return exp, nil
|
||||
}
|
||||
|
||||
func createExporter(ctx context.Context, set component.ExporterCreateSettings, cfg component.ExporterConfig, id component.ID, pipelineID component.ID, factory component.ExporterFactory) (component.Exporter, error) {
|
||||
func createExporter(ctx context.Context, set component.ExporterCreateSettings, cfg component.ExporterConfig, id component.ID, pipelineID component.ID, factory component.ExporterFactory) (component.Component, error) {
|
||||
switch pipelineID.Type() {
|
||||
case component.DataTypeTraces:
|
||||
return factory.CreateTracesExporter(ctx, set, cfg)
|
||||
|
|
@ -433,7 +433,7 @@ func buildProcessor(ctx context.Context,
|
|||
id component.ID,
|
||||
pipelineID component.ID,
|
||||
next baseConsumer,
|
||||
) (component.Processor, error) {
|
||||
) (component.Component, error) {
|
||||
procCfg, existsCfg := cfgs[id]
|
||||
if !existsCfg {
|
||||
return nil, fmt.Errorf("processor %q is not configured", id)
|
||||
|
|
@ -458,7 +458,7 @@ func buildProcessor(ctx context.Context,
|
|||
return proc, nil
|
||||
}
|
||||
|
||||
func createProcessor(ctx context.Context, set component.ProcessorCreateSettings, cfg component.ProcessorConfig, id component.ID, pipelineID component.ID, next baseConsumer, factory component.ProcessorFactory) (component.Processor, error) {
|
||||
func createProcessor(ctx context.Context, set component.ProcessorCreateSettings, cfg component.ProcessorConfig, id component.ID, pipelineID component.ID, next baseConsumer, factory component.ProcessorFactory) (component.Component, error) {
|
||||
switch pipelineID.Type() {
|
||||
case component.DataTypeTraces:
|
||||
return factory.CreateTracesProcessor(ctx, set, cfg, next.(consumer.Traces))
|
||||
|
|
@ -499,7 +499,7 @@ func buildReceiver(ctx context.Context,
|
|||
id component.ID,
|
||||
pipelineID component.ID,
|
||||
nexts []baseConsumer,
|
||||
) (component.Receiver, error) {
|
||||
) (component.Component, error) {
|
||||
cfg, existsCfg := cfgs[id]
|
||||
if !existsCfg {
|
||||
return nil, fmt.Errorf("receiver %q is not configured", id)
|
||||
|
|
@ -525,7 +525,7 @@ func buildReceiver(ctx context.Context,
|
|||
return recv, nil
|
||||
}
|
||||
|
||||
func createReceiver(ctx context.Context, set component.ReceiverCreateSettings, cfg component.ReceiverConfig, id component.ID, pipelineID component.ID, nexts []baseConsumer, factory component.ReceiverFactory) (component.Receiver, error) {
|
||||
func createReceiver(ctx context.Context, set component.ReceiverCreateSettings, cfg component.ReceiverConfig, id component.ID, pipelineID component.ID, nexts []baseConsumer, factory component.ReceiverFactory) (component.Component, error) {
|
||||
switch pipelineID.Type() {
|
||||
case component.DataTypeTraces:
|
||||
var consumers []consumer.Traces
|
||||
|
|
|
|||
Loading…
Reference in New Issue