Deprecate processorhelperprofiles module in favor of xprocessorhelper (#11889)

to allow adding more experimental data types

Updates
https://github.com/open-telemetry/opentelemetry-collector/issues/11778
This commit is contained in:
Dmitrii Anoshin 2024-12-13 20:46:51 -08:00 committed by GitHub
parent 97f6c3a0aa
commit 7941703056
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 443 additions and 99 deletions

View File

@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processorhelper
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate processorhelperprofiles module in favor of xprocessorhelper to allow adding more experimental data types.
# One or more tracking issues or pull requests related to the change
issues: [11778]
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]

View File

@ -8,11 +8,10 @@ require (
go.opentelemetry.io/collector/component/componenttest v0.115.0
go.opentelemetry.io/collector/consumer v1.21.0
go.opentelemetry.io/collector/consumer/consumertest v0.115.0
go.opentelemetry.io/collector/consumer/xconsumer v0.115.1-0.20241213185000-4593ba7de234
go.opentelemetry.io/collector/pdata/pprofile v0.115.0
go.opentelemetry.io/collector/processor v0.115.0
go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.0.0-00010101000000-000000000000
go.opentelemetry.io/collector/processor/processortest v0.115.0
go.opentelemetry.io/collector/processor/xprocessor v0.115.0
)
require (
@ -27,9 +26,11 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/component/componentstatus v0.115.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.115.1-0.20241213185000-4593ba7de234 // indirect
go.opentelemetry.io/collector/pdata v1.21.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.115.0 // indirect
go.opentelemetry.io/collector/pipeline v0.115.0 // indirect
go.opentelemetry.io/collector/processor/xprocessor v0.115.0 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
go.opentelemetry.io/otel/metric v1.32.0 // indirect
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
@ -73,3 +74,5 @@ replace go.opentelemetry.io/collector/component/componentstatus => ../../../comp
replace go.opentelemetry.io/collector/processor/processortest => ../../processortest
replace go.opentelemetry.io/collector/processor/xprocessor => ../../xprocessor
replace go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper => ../xprocessorhelper

View File

@ -1,64 +1,26 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Deprecated: [0.116.0] Use go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper instead.
package processorhelperprofiles // import "go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles"
import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
)
import "go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper"
// Option apply changes to internalOptions.
type Option interface {
apply(*baseSettings)
}
type optionFunc func(*baseSettings)
func (of optionFunc) apply(e *baseSettings) {
of(e)
}
// Deprecated: [0.116.0] Use xprocessorhelper.Option instead.
type Option = xprocessorhelper.Option
// WithStart overrides the default Start function for an processor.
// The default shutdown function does nothing and always returns nil.
func WithStart(start component.StartFunc) Option {
return optionFunc(func(o *baseSettings) {
o.StartFunc = start
})
}
// Deprecated: [0.116.0] Use xprocessorhelper.WithStart instead.
var WithStart = xprocessorhelper.WithStart
// WithShutdown overrides the default Shutdown function for an processor.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown component.ShutdownFunc) Option {
return optionFunc(func(o *baseSettings) {
o.ShutdownFunc = shutdown
})
}
// Deprecated: [0.116.0] Use xprocessorhelper.WithShutdown instead.
var WithShutdown = xprocessorhelper.WithShutdown
// WithCapabilities overrides the default GetCapabilities function for an processor.
// The default GetCapabilities function returns mutable capabilities.
func WithCapabilities(capabilities consumer.Capabilities) Option {
return optionFunc(func(o *baseSettings) {
o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities))
})
}
type baseSettings struct {
component.StartFunc
component.ShutdownFunc
consumerOptions []consumer.Option
}
// fromOptions returns the internal settings starting from the default and applying all options.
func fromOptions(options []Option) *baseSettings {
// Start from the default options:
opts := &baseSettings{
consumerOptions: []consumer.Option{consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})},
}
for _, op := range options {
op.apply(opts)
}
return opts
}
// Deprecated: [0.116.0] Use xprocessorhelper.WithCapabilities instead.
var WithCapabilities = xprocessorhelper.WithCapabilities

View File

@ -1,61 +1,18 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Deprecated: [0.116.0] Use go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper instead.
package processorhelperprofiles // import "go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles"
import (
"context"
"errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/xprocessor"
"go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper"
)
// ProcessProfilesFunc is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
type ProcessProfilesFunc func(context.Context, pprofile.Profiles) (pprofile.Profiles, error)
type profiles struct {
component.StartFunc
component.ShutdownFunc
xconsumer.Profiles
}
// Deprecated: [0.116.0] Use xprocessorhelper.ProcessProfilesFunc instead.
type ProcessProfilesFunc = xprocessorhelper.ProcessProfilesFunc
// NewProfiles creates a xprocessor.Profiles that ensure context propagation.
func NewProfiles(
_ context.Context,
_ processor.Settings,
_ component.Config,
nextConsumer xconsumer.Profiles,
profilesFunc ProcessProfilesFunc,
options ...Option,
) (xprocessor.Profiles, error) {
if profilesFunc == nil {
return nil, errors.New("nil profilesFunc")
}
bs := fromOptions(options)
profilesConsumer, err := xconsumer.NewProfiles(func(ctx context.Context, pd pprofile.Profiles) (err error) {
pd, err = profilesFunc(ctx, pd)
if err != nil {
if errors.Is(err, processorhelper.ErrSkipProcessingData) {
return nil
}
return err
}
return nextConsumer.ConsumeProfiles(ctx, pd)
}, bs.consumerOptions...)
if err != nil {
return nil, err
}
return &profiles{
StartFunc: bs.StartFunc,
ShutdownFunc: bs.ShutdownFunc,
Profiles: profilesConsumer,
}, nil
}
// Deprecated: [0.116.0] Use xprocessorhelper.NewProfiles instead.
var NewProfiles = xprocessorhelper.NewProfiles

View File

@ -0,0 +1 @@
include ../../../Makefile.Common

View File

@ -0,0 +1,75 @@
module go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper
go 1.22.0
require (
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.115.0
go.opentelemetry.io/collector/component/componenttest v0.115.0
go.opentelemetry.io/collector/consumer v1.21.0
go.opentelemetry.io/collector/consumer/consumertest v0.115.0
go.opentelemetry.io/collector/consumer/xconsumer v0.115.1-0.20241213185000-4593ba7de234
go.opentelemetry.io/collector/pdata/pprofile v0.115.0
go.opentelemetry.io/collector/processor v0.115.0
go.opentelemetry.io/collector/processor/processortest v0.115.0
go.opentelemetry.io/collector/processor/xprocessor v0.115.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/component/componentstatus v0.115.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect
go.opentelemetry.io/collector/pdata v1.21.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.115.0 // indirect
go.opentelemetry.io/collector/pipeline v0.115.0 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
go.opentelemetry.io/otel/metric v1.32.0 // indirect
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.68.1 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace go.opentelemetry.io/collector/consumer/consumertest => ../../../consumer/consumertest
replace go.opentelemetry.io/collector/pdata/pprofile => ../../../pdata/pprofile
replace go.opentelemetry.io/collector/pdata/testdata => ../../../pdata/testdata
replace go.opentelemetry.io/collector/processor => ../../../processor
replace go.opentelemetry.io/collector/consumer => ../../../consumer
replace go.opentelemetry.io/collector/consumer/xconsumer => ../../../consumer/xconsumer
replace go.opentelemetry.io/collector/component => ../../../component
replace go.opentelemetry.io/collector/component/componenttest => ../../../component/componenttest
replace go.opentelemetry.io/collector/pdata => ../../../pdata
replace go.opentelemetry.io/collector/config/configtelemetry => ../../../config/configtelemetry
replace go.opentelemetry.io/collector/pipeline => ../../../pipeline
replace go.opentelemetry.io/collector/component/componentstatus => ../../../component/componentstatus
replace go.opentelemetry.io/collector/processor/processortest => ../../processortest
replace go.opentelemetry.io/collector/processor/xprocessor => ../../xprocessor

View File

@ -0,0 +1,98 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M=
go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8=
go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4=
go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU=
go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU=
go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ=
go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM=
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0=
google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package xprocessorhelper // import "go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper"
import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
)
// Option apply changes to internalOptions.
type Option interface {
apply(*baseSettings)
}
type optionFunc func(*baseSettings)
func (of optionFunc) apply(e *baseSettings) {
of(e)
}
// WithStart overrides the default Start function for an processor.
// The default shutdown function does nothing and always returns nil.
func WithStart(start component.StartFunc) Option {
return optionFunc(func(o *baseSettings) {
o.StartFunc = start
})
}
// WithShutdown overrides the default Shutdown function for an processor.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown component.ShutdownFunc) Option {
return optionFunc(func(o *baseSettings) {
o.ShutdownFunc = shutdown
})
}
// WithCapabilities overrides the default GetCapabilities function for an processor.
// The default GetCapabilities function returns mutable capabilities.
func WithCapabilities(capabilities consumer.Capabilities) Option {
return optionFunc(func(o *baseSettings) {
o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities))
})
}
type baseSettings struct {
component.StartFunc
component.ShutdownFunc
consumerOptions []consumer.Option
}
// fromOptions returns the internal settings starting from the default and applying all options.
func fromOptions(options []Option) *baseSettings {
// Start from the default options:
opts := &baseSettings{
consumerOptions: []consumer.Option{consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})},
}
for _, op := range options {
op.apply(opts)
}
return opts
}

View File

@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package xprocessorhelper // import "go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper"
import (
"context"
"errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/xprocessor"
)
// ProcessProfilesFunc is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
type ProcessProfilesFunc func(context.Context, pprofile.Profiles) (pprofile.Profiles, error)
type profiles struct {
component.StartFunc
component.ShutdownFunc
xconsumer.Profiles
}
// NewProfiles creates a xprocessor.Profiles that ensure context propagation.
func NewProfiles(
_ context.Context,
_ processor.Settings,
_ component.Config,
nextConsumer xconsumer.Profiles,
profilesFunc ProcessProfilesFunc,
options ...Option,
) (xprocessor.Profiles, error) {
if profilesFunc == nil {
return nil, errors.New("nil profilesFunc")
}
bs := fromOptions(options)
profilesConsumer, err := xconsumer.NewProfiles(func(ctx context.Context, pd pprofile.Profiles) (err error) {
pd, err = profilesFunc(ctx, pd)
if err != nil {
if errors.Is(err, processorhelper.ErrSkipProcessingData) {
return nil
}
return err
}
return nextConsumer.ConsumeProfiles(ctx, pd)
}, bs.consumerOptions...)
if err != nil {
return nil, err
}
return &profiles{
StartFunc: bs.StartFunc,
ShutdownFunc: bs.ShutdownFunc,
Profiles: profilesConsumer,
}, nil
}

View File

@ -0,0 +1,102 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package xprocessorhelper
import (
"context"
"errors"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/processortest"
)
var testProfilesCfg = struct{}{}
func TestNewProfiles(t *testing.T) {
pp, err := NewProfiles(context.Background(), processortest.NewNopSettings(), &testProfilesCfg, consumertest.NewNop(), newTestPProcessor(nil))
require.NoError(t, err)
assert.True(t, pp.Capabilities().MutatesData)
assert.NoError(t, pp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, pp.ConsumeProfiles(context.Background(), pprofile.NewProfiles()))
assert.NoError(t, pp.Shutdown(context.Background()))
}
func TestNewProfiles_WithOptions(t *testing.T) {
want := errors.New("my_error")
pp, err := NewProfiles(context.Background(), processortest.NewNopSettings(), &testProfilesCfg, consumertest.NewNop(), newTestPProcessor(nil),
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithCapabilities(consumer.Capabilities{MutatesData: false}))
require.NoError(t, err)
assert.Equal(t, want, pp.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, want, pp.Shutdown(context.Background()))
assert.False(t, pp.Capabilities().MutatesData)
}
func TestNewProfiles_NilRequiredFields(t *testing.T) {
_, err := NewProfiles(context.Background(), processortest.NewNopSettings(), &testProfilesCfg, consumertest.NewNop(), nil)
assert.Error(t, err)
}
func TestNewProfiles_ProcessProfileError(t *testing.T) {
want := errors.New("my_error")
pp, err := NewProfiles(context.Background(), processortest.NewNopSettings(), &testProfilesCfg, consumertest.NewNop(), newTestPProcessor(want))
require.NoError(t, err)
assert.Equal(t, want, pp.ConsumeProfiles(context.Background(), pprofile.NewProfiles()))
}
func TestNewProfiles_ProcessProfilesErrSkipProcessingData(t *testing.T) {
pp, err := NewProfiles(context.Background(), processortest.NewNopSettings(), &testProfilesCfg, consumertest.NewNop(), newTestPProcessor(processorhelper.ErrSkipProcessingData))
require.NoError(t, err)
assert.NoError(t, pp.ConsumeProfiles(context.Background(), pprofile.NewProfiles()))
}
func newTestPProcessor(retError error) ProcessProfilesFunc {
return func(_ context.Context, pd pprofile.Profiles) (pprofile.Profiles, error) {
return pd, retError
}
}
func TestProfilesConcurrency(t *testing.T) {
profilesFunc := func(_ context.Context, pd pprofile.Profiles) (pprofile.Profiles, error) {
return pd, nil
}
incomingProfiles := pprofile.NewProfiles()
ps := incomingProfiles.ResourceProfiles().AppendEmpty().ScopeProfiles().AppendEmpty().Profiles()
// Add 3 profiles to the incoming
ps.AppendEmpty()
ps.AppendEmpty()
ps.AppendEmpty()
pp, err := NewProfiles(context.Background(), processortest.NewNopSettings(), &testProfilesCfg, consumertest.NewNop(), profilesFunc)
require.NoError(t, err)
assert.NoError(t, pp.Start(context.Background(), componenttest.NewNopHost()))
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10000; j++ {
assert.NoError(t, pp.ConsumeProfiles(context.Background(), incomingProfiles))
}
}()
}
wg.Wait()
assert.NoError(t, pp.Shutdown(context.Background()))
}

View File

@ -79,6 +79,7 @@ module-sets:
- go.opentelemetry.io/collector/processor/memorylimiterprocessor
- go.opentelemetry.io/collector/processor/processorprofiles
- go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles
- go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper
- go.opentelemetry.io/collector/processor/xprocessor
- go.opentelemetry.io/collector/receiver
- go.opentelemetry.io/collector/receiver/nopreceiver