#### Description This PR introduces a testable example to the package [processor](/processor). it's a similar example to the documentation one on https://opentelemetry.io/docs/collector/building/receiver/#designing-and-validating-receiver-settings #### Issue: https://github.com/open-telemetry/opentelemetry-collector/issues/5675#issuecomment-2796741807 #### Testing the testable example was executed successfully:  #### Documentation this testable example follows https://go.dev/blog/examples#larger-examples format and should be automatically added to `processor` package documentation
This commit is contained in:
parent
fd4d3c4465
commit
d904716c77
|
|
@ -11,6 +11,7 @@ Some important aspects of pipelines and processors to be aware of:
|
|||
- [Exclusive Ownership](#exclusive-ownership)
|
||||
- [Shared Ownership](#shared-ownership)
|
||||
- [Ordering Processors](#ordering-processors)
|
||||
- [Creating Custom Processor](#creating-custom-processors)
|
||||
|
||||
Supported processors (sorted alphabetically):
|
||||
- [Batch Processor](batchprocessor/README.md)
|
||||
|
|
@ -107,3 +108,8 @@ data cloning described in Exclusive Ownership section.
|
|||
|
||||
The order processors are specified in a pipeline is important as this is the
|
||||
order in which each processor is applied.
|
||||
|
||||
## Creating Custom Processors
|
||||
|
||||
To create a custom processor for the OpenTelemetry Collector, you need to implement the processor interface, define the processor's configuration, and register it with the Collector. The process typically involves creating a factory, implementing the required processing logic, and handling configuration options. For a practical example and guidance, refer to the [`processorhelper`](https://pkg.go.dev/go.opentelemetry.io/collector/processor/processorhelper) package, which provides utilities and patterns to simplify processor development.
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,99 @@
|
|||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package processorhelper_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/pdata/pmetric"
|
||||
"go.opentelemetry.io/collector/processor"
|
||||
"go.opentelemetry.io/collector/processor/processorhelper"
|
||||
)
|
||||
|
||||
// typeStr defines the unique type identifier for the processor.
|
||||
var typeStr = component.MustNewType("example")
|
||||
|
||||
// exampleConfig holds configuration settings for the processor.
|
||||
type exampleConfig struct{}
|
||||
|
||||
// exampleProcessor implements the OpenTelemetry processor interface.
|
||||
type exampleProcessor struct {
|
||||
cancel context.CancelFunc
|
||||
config exampleConfig
|
||||
}
|
||||
|
||||
// Example demonstrates the usage of the processor factory.
|
||||
func Example() {
|
||||
// Instantiate the processor factory and print its type.
|
||||
exampleProcessor := NewFactory()
|
||||
fmt.Println(exampleProcessor.Type())
|
||||
|
||||
// Output:
|
||||
// example
|
||||
}
|
||||
|
||||
// NewFactory creates a new processor factory.
|
||||
func NewFactory() processor.Factory {
|
||||
return processor.NewFactory(
|
||||
typeStr,
|
||||
createDefaultConfig,
|
||||
processor.WithMetrics(createExampleProcessor, component.StabilityLevelAlpha),
|
||||
)
|
||||
}
|
||||
|
||||
// createDefaultConfig returns the default configuration for the processor.
|
||||
func createDefaultConfig() component.Config {
|
||||
return &exampleConfig{}
|
||||
}
|
||||
|
||||
// createExampleProcessor initializes an instance of the example processor.
|
||||
func createExampleProcessor(ctx context.Context, params processor.Settings, baseCfg component.Config, next consumer.Metrics) (processor.Metrics, error) {
|
||||
// Convert baseCfg to the correct type.
|
||||
cfg := baseCfg.(*exampleConfig)
|
||||
|
||||
// Create a new processor instance.
|
||||
pcsr := newExampleProcessor(ctx, cfg)
|
||||
|
||||
// Wrap the processor with the helper utilities.
|
||||
return processorhelper.NewMetrics(
|
||||
ctx,
|
||||
params,
|
||||
cfg,
|
||||
next,
|
||||
pcsr.consumeMetrics,
|
||||
processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}),
|
||||
processorhelper.WithShutdown(pcsr.shutdown),
|
||||
)
|
||||
}
|
||||
|
||||
// newExampleProcessor constructs a new instance of the example processor.
|
||||
func newExampleProcessor(ctx context.Context, cfg *exampleConfig) *exampleProcessor {
|
||||
pcsr := &exampleProcessor{
|
||||
config: *cfg,
|
||||
}
|
||||
|
||||
// Create a cancelable context.
|
||||
_, pcsr.cancel = context.WithCancel(ctx)
|
||||
|
||||
return pcsr
|
||||
}
|
||||
|
||||
// ConsumeMetrics modify metrics adding one attribute to resource.
|
||||
func (pcsr *exampleProcessor) consumeMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
|
||||
rm := md.ResourceMetrics()
|
||||
for i := 0; i < rm.Len(); i++ {
|
||||
rm.At(i).Resource().Attributes().PutStr("processed_by", "exampleProcessor")
|
||||
}
|
||||
|
||||
return md, nil
|
||||
}
|
||||
|
||||
// Shutdown properly stops the processor and releases resources.
|
||||
func (pcsr *exampleProcessor) shutdown(_ context.Context) error {
|
||||
pcsr.cancel()
|
||||
return nil
|
||||
}
|
||||
Loading…
Reference in New Issue