160 lines
4.6 KiB
Go
160 lines
4.6 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package xconnector
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"go.opentelemetry.io/collector/consumer"
|
|
"go.opentelemetry.io/collector/consumer/consumertest"
|
|
"go.opentelemetry.io/collector/consumer/xconsumer"
|
|
"go.opentelemetry.io/collector/pdata/pprofile"
|
|
"go.opentelemetry.io/collector/pdata/testdata"
|
|
"go.opentelemetry.io/collector/pipeline"
|
|
"go.opentelemetry.io/collector/pipeline/xpipeline"
|
|
)
|
|
|
|
type mutatingProfilesSink struct {
|
|
*consumertest.ProfilesSink
|
|
}
|
|
|
|
func (mts *mutatingProfilesSink) Capabilities() consumer.Capabilities {
|
|
return consumer.Capabilities{MutatesData: true}
|
|
}
|
|
|
|
func TestProfilesRouterMultiplexing(t *testing.T) {
|
|
num := 20
|
|
for numIDs := 1; numIDs < num; numIDs++ {
|
|
for numCons := 1; numCons < num; numCons++ {
|
|
for numProfiles := 1; numProfiles < num; numProfiles++ {
|
|
t.Run(
|
|
fmt.Sprintf("%d-ids/%d-cons/%d-logs", numIDs, numCons, numProfiles),
|
|
fuzzProfiles(numIDs, numCons, numProfiles),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func fuzzProfiles(numIDs, numCons, numProfiles int) func(*testing.T) {
|
|
return func(t *testing.T) {
|
|
allIDs := make([]pipeline.ID, 0, numCons)
|
|
allCons := make([]xconsumer.Profiles, 0, numCons)
|
|
allConsMap := make(map[pipeline.ID]xconsumer.Profiles)
|
|
|
|
// If any consumer is mutating, the router must report mutating
|
|
for i := 0; i < numCons; i++ {
|
|
allIDs = append(allIDs, pipeline.NewIDWithName(xpipeline.SignalProfiles, "sink_"+strconv.Itoa(numCons)))
|
|
// Random chance for each consumer to be mutating
|
|
if (numCons+numProfiles+i)%4 == 0 {
|
|
allCons = append(allCons, &mutatingProfilesSink{ProfilesSink: new(consumertest.ProfilesSink)})
|
|
} else {
|
|
allCons = append(allCons, new(consumertest.ProfilesSink))
|
|
}
|
|
allConsMap[allIDs[i]] = allCons[i]
|
|
}
|
|
|
|
r := NewProfilesRouter(allConsMap)
|
|
td := testdata.GenerateProfiles(1)
|
|
|
|
// Keep track of how many logs each consumer should receive.
|
|
// This will be validated after every call to RouteProfiles.
|
|
expected := make(map[pipeline.ID]int, numCons)
|
|
|
|
for i := 0; i < numProfiles; i++ {
|
|
// Build a random set of ids (no duplicates)
|
|
randCons := make(map[pipeline.ID]bool, numIDs)
|
|
for j := 0; j < numIDs; j++ {
|
|
// This number should be pretty random and less than numCons
|
|
conNum := (numCons + numIDs + i + j) % numCons
|
|
randCons[allIDs[conNum]] = true
|
|
}
|
|
|
|
// Convert to slice, update expectations
|
|
conIDs := make([]pipeline.ID, 0, len(randCons))
|
|
for id := range randCons {
|
|
conIDs = append(conIDs, id)
|
|
expected[id]++
|
|
}
|
|
|
|
// Route to list of consumers
|
|
fanout, err := r.Consumer(conIDs...)
|
|
assert.NoError(t, err)
|
|
assert.NoError(t, fanout.ConsumeProfiles(context.Background(), td))
|
|
|
|
// Validate expectations for all consumers
|
|
for id := range expected {
|
|
profiles := []pprofile.Profiles{}
|
|
switch con := allConsMap[id].(type) {
|
|
case *consumertest.ProfilesSink:
|
|
profiles = con.AllProfiles()
|
|
case *mutatingProfilesSink:
|
|
profiles = con.AllProfiles()
|
|
}
|
|
assert.Len(t, profiles, expected[id])
|
|
for n := 0; n < len(profiles); n++ {
|
|
assert.Equal(t, td, profiles[n])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestProfilessRouterConsumer(t *testing.T) {
|
|
ctx := context.Background()
|
|
td := testdata.GenerateProfiles(1)
|
|
|
|
fooID := pipeline.NewIDWithName(xpipeline.SignalProfiles, "foo")
|
|
barID := pipeline.NewIDWithName(xpipeline.SignalProfiles, "bar")
|
|
|
|
foo := new(consumertest.ProfilesSink)
|
|
bar := new(consumertest.ProfilesSink)
|
|
r := NewProfilesRouter(map[pipeline.ID]xconsumer.Profiles{fooID: foo, barID: bar})
|
|
|
|
rcs := r.PipelineIDs()
|
|
assert.Len(t, rcs, 2)
|
|
assert.ElementsMatch(t, []pipeline.ID{fooID, barID}, rcs)
|
|
|
|
assert.Empty(t, foo.AllProfiles())
|
|
assert.Empty(t, bar.AllProfiles())
|
|
|
|
both, err := r.Consumer(fooID, barID)
|
|
assert.NotNil(t, both)
|
|
assert.NoError(t, err)
|
|
|
|
assert.NoError(t, both.ConsumeProfiles(ctx, td))
|
|
assert.Len(t, foo.AllProfiles(), 1)
|
|
assert.Len(t, bar.AllProfiles(), 1)
|
|
|
|
fooOnly, err := r.Consumer(fooID)
|
|
assert.NotNil(t, fooOnly)
|
|
assert.NoError(t, err)
|
|
|
|
assert.NoError(t, fooOnly.ConsumeProfiles(ctx, td))
|
|
assert.Len(t, foo.AllProfiles(), 2)
|
|
assert.Len(t, bar.AllProfiles(), 1)
|
|
|
|
barOnly, err := r.Consumer(barID)
|
|
assert.NotNil(t, barOnly)
|
|
assert.NoError(t, err)
|
|
|
|
assert.NoError(t, barOnly.ConsumeProfiles(ctx, td))
|
|
assert.Len(t, foo.AllProfiles(), 2)
|
|
assert.Len(t, bar.AllProfiles(), 2)
|
|
|
|
none, err := r.Consumer()
|
|
assert.Nil(t, none)
|
|
require.Error(t, err)
|
|
|
|
fake, err := r.Consumer(pipeline.NewIDWithName(xpipeline.SignalProfiles, "fake"))
|
|
assert.Nil(t, fake)
|
|
assert.Error(t, err)
|
|
}
|