[chore] Add internal attribute package (#12073)
This PR creates a new internal package which defines the correct set of attributes for each kind of component. This is a subset of #12057 which is broken off in order to reduce the overall size of that PR. As such, this package will not actually be used until #12057 is merged.
This commit is contained in:
parent
81f1fad0ee
commit
52d1414547
|
|
@ -0,0 +1,103 @@
|
|||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package attribute // import "go.opentelemetry.io/collector/service/internal/attribute"
|
||||
|
||||
import (
|
||||
"hash/fnv"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/pipeline"
|
||||
)
|
||||
|
||||
const (
|
||||
componentKindKey = "otelcol.component.kind"
|
||||
componentIDKey = "otelcol.component.id"
|
||||
pipelineIDKey = "otelcol.pipeline.id"
|
||||
signalKey = "otelcol.signal"
|
||||
signalOutputKey = "otelcol.signal.output"
|
||||
|
||||
capabiltiesKind = "capabilities"
|
||||
fanoutKind = "fanout"
|
||||
)
|
||||
|
||||
type Attributes struct {
|
||||
set attribute.Set
|
||||
id int64
|
||||
}
|
||||
|
||||
func newAttributes(attrs ...attribute.KeyValue) *Attributes {
|
||||
h := fnv.New64a()
|
||||
for _, kv := range attrs {
|
||||
h.Write([]byte("(" + string(kv.Key) + "|" + kv.Value.AsString() + ")"))
|
||||
}
|
||||
return &Attributes{
|
||||
set: attribute.NewSet(attrs...),
|
||||
id: int64(h.Sum64()), // #nosec G115
|
||||
}
|
||||
}
|
||||
|
||||
func (a Attributes) Attributes() *attribute.Set {
|
||||
return &a.set
|
||||
}
|
||||
|
||||
func (a Attributes) ID() int64 {
|
||||
return a.id
|
||||
}
|
||||
|
||||
func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes {
|
||||
return newAttributes(
|
||||
attribute.String(componentKindKey, component.KindReceiver.String()),
|
||||
attribute.String(signalKey, pipelineType.String()),
|
||||
attribute.String(componentIDKey, id.String()),
|
||||
)
|
||||
}
|
||||
|
||||
func Processor(pipelineID pipeline.ID, id component.ID) *Attributes {
|
||||
return newAttributes(
|
||||
attribute.String(componentKindKey, component.KindProcessor.String()),
|
||||
attribute.String(signalKey, pipelineID.Signal().String()),
|
||||
attribute.String(pipelineIDKey, pipelineID.String()),
|
||||
attribute.String(componentIDKey, id.String()),
|
||||
)
|
||||
}
|
||||
|
||||
func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes {
|
||||
return newAttributes(
|
||||
attribute.String(componentKindKey, component.KindExporter.String()),
|
||||
attribute.String(signalKey, pipelineType.String()),
|
||||
attribute.String(componentIDKey, id.String()),
|
||||
)
|
||||
}
|
||||
|
||||
func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes {
|
||||
return newAttributes(
|
||||
attribute.String(componentKindKey, component.KindConnector.String()),
|
||||
attribute.String(signalKey, exprPipelineType.String()),
|
||||
attribute.String(signalOutputKey, rcvrPipelineType.String()),
|
||||
attribute.String(componentIDKey, id.String()),
|
||||
)
|
||||
}
|
||||
|
||||
func Capabilities(pipelineID pipeline.ID) *Attributes {
|
||||
return newAttributes(
|
||||
attribute.String(componentKindKey, capabiltiesKind),
|
||||
attribute.String(pipelineIDKey, pipelineID.String()),
|
||||
)
|
||||
}
|
||||
|
||||
func Fanout(pipelineID pipeline.ID) *Attributes {
|
||||
return newAttributes(
|
||||
attribute.String(componentKindKey, fanoutKind),
|
||||
attribute.String(pipelineIDKey, pipelineID.String()),
|
||||
)
|
||||
}
|
||||
|
||||
func Extension(id component.ID) *Attributes {
|
||||
return newAttributes(
|
||||
attribute.String(componentKindKey, component.KindExtension.String()),
|
||||
attribute.String(componentIDKey, id.String()),
|
||||
)
|
||||
}
|
||||
|
|
@ -0,0 +1,193 @@
|
|||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package attribute
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/pipeline"
|
||||
"go.opentelemetry.io/collector/pipeline/xpipeline"
|
||||
)
|
||||
|
||||
var (
|
||||
signals = []pipeline.Signal{
|
||||
pipeline.SignalTraces,
|
||||
pipeline.SignalMetrics,
|
||||
pipeline.SignalLogs,
|
||||
xpipeline.SignalProfiles,
|
||||
}
|
||||
|
||||
cIDs = []component.ID{
|
||||
component.MustNewID("foo"),
|
||||
component.MustNewID("foo2"),
|
||||
component.MustNewID("bar"),
|
||||
}
|
||||
|
||||
pIDs = []pipeline.ID{
|
||||
pipeline.MustNewID("traces"),
|
||||
pipeline.MustNewIDWithName("traces", "2"),
|
||||
pipeline.MustNewID("metrics"),
|
||||
pipeline.MustNewIDWithName("metrics", "2"),
|
||||
pipeline.MustNewID("logs"),
|
||||
pipeline.MustNewIDWithName("logs", "2"),
|
||||
pipeline.MustNewID("profiles"),
|
||||
pipeline.MustNewIDWithName("profiles", "2"),
|
||||
}
|
||||
)
|
||||
|
||||
func TestReceiver(t *testing.T) {
|
||||
for _, sig := range signals {
|
||||
for _, id := range cIDs {
|
||||
r := Receiver(sig, id)
|
||||
componentKind, ok := r.Attributes().Value(componentKindKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, component.KindReceiver.String(), componentKind.AsString())
|
||||
|
||||
signal, ok := r.Attributes().Value(signalKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, sig.String(), signal.AsString())
|
||||
|
||||
componentID, ok := r.Attributes().Value(componentIDKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, id.String(), componentID.AsString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessor(t *testing.T) {
|
||||
for _, pID := range pIDs {
|
||||
for _, id := range cIDs {
|
||||
p := Processor(pID, id)
|
||||
componentKind, ok := p.Attributes().Value(componentKindKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, component.KindProcessor.String(), componentKind.AsString())
|
||||
|
||||
pipelineID, ok := p.Attributes().Value(pipelineIDKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, pID.String(), pipelineID.AsString())
|
||||
|
||||
componentID, ok := p.Attributes().Value(componentIDKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, id.String(), componentID.AsString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestExporter(t *testing.T) {
|
||||
for _, sig := range signals {
|
||||
for _, id := range cIDs {
|
||||
e := Exporter(sig, id)
|
||||
componentKind, ok := e.Attributes().Value(componentKindKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, component.KindExporter.String(), componentKind.AsString())
|
||||
|
||||
signal, ok := e.Attributes().Value(signalKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, sig.String(), signal.AsString())
|
||||
|
||||
componentID, ok := e.Attributes().Value(componentIDKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, id.String(), componentID.AsString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnector(t *testing.T) {
|
||||
for _, exprSig := range signals {
|
||||
for _, rcvrSig := range signals {
|
||||
for _, id := range cIDs {
|
||||
c := Connector(exprSig, rcvrSig, id)
|
||||
componentKind, ok := c.Attributes().Value(componentKindKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, component.KindConnector.String(), componentKind.AsString())
|
||||
|
||||
signal, ok := c.Attributes().Value(signalKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, exprSig.String(), signal.AsString())
|
||||
|
||||
signalOutput, ok := c.Attributes().Value(signalOutputKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, rcvrSig.String(), signalOutput.AsString())
|
||||
|
||||
componentID, ok := c.Attributes().Value(componentIDKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, id.String(), componentID.AsString())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtension(t *testing.T) {
|
||||
e := Extension(component.MustNewID("foo"))
|
||||
componentKind, ok := e.Attributes().Value(componentKindKey)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, component.KindExtension.String(), componentKind.AsString())
|
||||
}
|
||||
|
||||
func TestSetEquality(t *testing.T) {
|
||||
// The sets are created independently but should be exactly equivalent.
|
||||
// We will ensure that corresponding elements are equal and that
|
||||
// non-corresponding elements are not equal.
|
||||
setI, setJ := createExampleSets(), createExampleSets()
|
||||
for i, ei := range setI {
|
||||
for j, ej := range setJ {
|
||||
if i == j {
|
||||
require.Equal(t, ei.ID(), ej.ID())
|
||||
require.True(t, ei.Attributes().Equals(ej.Attributes()))
|
||||
} else {
|
||||
require.NotEqual(t, ei.ID(), ej.ID())
|
||||
require.False(t, ei.Attributes().Equals(ej.Attributes()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createExampleSets() []*Attributes {
|
||||
sets := []*Attributes{}
|
||||
|
||||
// Receiver examples.
|
||||
for _, sig := range signals {
|
||||
for _, id := range cIDs {
|
||||
sets = append(sets, Receiver(sig, id))
|
||||
}
|
||||
}
|
||||
|
||||
// Processor examples.
|
||||
for _, pID := range pIDs {
|
||||
for _, cID := range cIDs {
|
||||
sets = append(sets, Processor(pID, cID))
|
||||
}
|
||||
}
|
||||
|
||||
// Exporter examples.
|
||||
for _, sig := range signals {
|
||||
for _, id := range cIDs {
|
||||
sets = append(sets, Exporter(sig, id))
|
||||
}
|
||||
}
|
||||
|
||||
// Connector examples.
|
||||
for _, exprSig := range signals {
|
||||
for _, rcvrSig := range signals {
|
||||
for _, id := range cIDs {
|
||||
sets = append(sets, Connector(exprSig, rcvrSig, id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Capabilities examples.
|
||||
for _, pID := range pIDs {
|
||||
sets = append(sets, Capabilities(pID))
|
||||
}
|
||||
|
||||
// Fanout examples.
|
||||
for _, pID := range pIDs {
|
||||
sets = append(sets, Fanout(pID))
|
||||
}
|
||||
|
||||
return sets
|
||||
}
|
||||
Loading…
Reference in New Issue