158 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			158 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Copyright The OpenTelemetry Authors
 | |
| // SPDX-License-Identifier: Apache-2.0
 | |
| 
 | |
| package connector
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"strconv"
 | |
| 	"testing"
 | |
| 
 | |
| 	"github.com/stretchr/testify/assert"
 | |
| 	"github.com/stretchr/testify/require"
 | |
| 
 | |
| 	"go.opentelemetry.io/collector/consumer"
 | |
| 	"go.opentelemetry.io/collector/consumer/consumertest"
 | |
| 	"go.opentelemetry.io/collector/pdata/plog"
 | |
| 	"go.opentelemetry.io/collector/pdata/testdata"
 | |
| 	"go.opentelemetry.io/collector/pipeline"
 | |
| )
 | |
| 
 | |
| type mutatingLogsSink struct {
 | |
| 	*consumertest.LogsSink
 | |
| }
 | |
| 
 | |
| func (mts *mutatingLogsSink) Capabilities() consumer.Capabilities {
 | |
| 	return consumer.Capabilities{MutatesData: true}
 | |
| }
 | |
| 
 | |
| func TestLogsRouterMultiplexing(t *testing.T) {
 | |
| 	num := 20
 | |
| 	for numIDs := 1; numIDs < num; numIDs++ {
 | |
| 		for numCons := 1; numCons < num; numCons++ {
 | |
| 			for numLogs := 1; numLogs < num; numLogs++ {
 | |
| 				t.Run(
 | |
| 					fmt.Sprintf("%d-ids/%d-cons/%d-logs", numIDs, numCons, numLogs),
 | |
| 					fuzzLogs(numIDs, numCons, numLogs),
 | |
| 				)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func fuzzLogs(numIDs, numCons, numLogs int) func(*testing.T) {
 | |
| 	return func(t *testing.T) {
 | |
| 		allIDs := make([]pipeline.ID, 0, numCons)
 | |
| 		allCons := make([]consumer.Logs, 0, numCons)
 | |
| 		allConsMap := make(map[pipeline.ID]consumer.Logs)
 | |
| 
 | |
| 		// If any consumer is mutating, the router must report mutating
 | |
| 		for i := 0; i < numCons; i++ {
 | |
| 			allIDs = append(allIDs, pipeline.NewIDWithName(pipeline.SignalLogs, "sink_"+strconv.Itoa(numCons)))
 | |
| 			// Random chance for each consumer to be mutating
 | |
| 			if (numCons+numLogs+i)%4 == 0 {
 | |
| 				allCons = append(allCons, &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)})
 | |
| 			} else {
 | |
| 				allCons = append(allCons, new(consumertest.LogsSink))
 | |
| 			}
 | |
| 			allConsMap[allIDs[i]] = allCons[i]
 | |
| 		}
 | |
| 
 | |
| 		r := NewLogsRouter(allConsMap)
 | |
| 		ld := testdata.GenerateLogs(1)
 | |
| 
 | |
| 		// Keep track of how many logs each consumer should receive.
 | |
| 		// This will be validated after every call to RouteLogs.
 | |
| 		expected := make(map[pipeline.ID]int, numCons)
 | |
| 
 | |
| 		for i := 0; i < numLogs; i++ {
 | |
| 			// Build a random set of ids (no duplicates)
 | |
| 			randCons := make(map[pipeline.ID]bool, numIDs)
 | |
| 			for j := 0; j < numIDs; j++ {
 | |
| 				// This number should be pretty random and less than numCons
 | |
| 				conNum := (numCons + numIDs + i + j) % numCons
 | |
| 				randCons[allIDs[conNum]] = true
 | |
| 			}
 | |
| 
 | |
| 			// Convert to slice, update expectations
 | |
| 			conIDs := make([]pipeline.ID, 0, len(randCons))
 | |
| 			for id := range randCons {
 | |
| 				conIDs = append(conIDs, id)
 | |
| 				expected[id]++
 | |
| 			}
 | |
| 
 | |
| 			// Route to list of consumers
 | |
| 			fanout, err := r.Consumer(conIDs...)
 | |
| 			assert.NoError(t, err)
 | |
| 			assert.NoError(t, fanout.ConsumeLogs(context.Background(), ld))
 | |
| 
 | |
| 			// Validate expectations for all consumers
 | |
| 			for id := range expected {
 | |
| 				logs := []plog.Logs{}
 | |
| 				switch con := allConsMap[id].(type) {
 | |
| 				case *consumertest.LogsSink:
 | |
| 					logs = con.AllLogs()
 | |
| 				case *mutatingLogsSink:
 | |
| 					logs = con.AllLogs()
 | |
| 				}
 | |
| 				assert.Len(t, logs, expected[id])
 | |
| 				for n := 0; n < len(logs); n++ {
 | |
| 					assert.Equal(t, ld, logs[n])
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestLogsRouterConsumers(t *testing.T) {
 | |
| 	ctx := context.Background()
 | |
| 	ld := testdata.GenerateLogs(1)
 | |
| 
 | |
| 	fooID := pipeline.NewIDWithName(pipeline.SignalLogs, "foo")
 | |
| 	barID := pipeline.NewIDWithName(pipeline.SignalLogs, "bar")
 | |
| 
 | |
| 	foo := new(consumertest.LogsSink)
 | |
| 	bar := new(consumertest.LogsSink)
 | |
| 	r := NewLogsRouter(map[pipeline.ID]consumer.Logs{fooID: foo, barID: bar})
 | |
| 
 | |
| 	rcs := r.PipelineIDs()
 | |
| 	assert.Len(t, rcs, 2)
 | |
| 	assert.ElementsMatch(t, []pipeline.ID{fooID, barID}, rcs)
 | |
| 
 | |
| 	assert.Empty(t, foo.AllLogs())
 | |
| 	assert.Empty(t, bar.AllLogs())
 | |
| 
 | |
| 	both, err := r.Consumer(fooID, barID)
 | |
| 	assert.NotNil(t, both)
 | |
| 	assert.NoError(t, err)
 | |
| 
 | |
| 	assert.NoError(t, both.ConsumeLogs(ctx, ld))
 | |
| 	assert.Len(t, foo.AllLogs(), 1)
 | |
| 	assert.Len(t, bar.AllLogs(), 1)
 | |
| 
 | |
| 	fooOnly, err := r.Consumer(fooID)
 | |
| 	assert.NotNil(t, fooOnly)
 | |
| 	assert.NoError(t, err)
 | |
| 
 | |
| 	assert.NoError(t, fooOnly.ConsumeLogs(ctx, ld))
 | |
| 	assert.Len(t, foo.AllLogs(), 2)
 | |
| 	assert.Len(t, bar.AllLogs(), 1)
 | |
| 
 | |
| 	barOnly, err := r.Consumer(barID)
 | |
| 	assert.NotNil(t, barOnly)
 | |
| 	assert.NoError(t, err)
 | |
| 
 | |
| 	assert.NoError(t, barOnly.ConsumeLogs(ctx, ld))
 | |
| 	assert.Len(t, foo.AllLogs(), 2)
 | |
| 	assert.Len(t, bar.AllLogs(), 2)
 | |
| 
 | |
| 	none, err := r.Consumer()
 | |
| 	assert.Nil(t, none)
 | |
| 	require.Error(t, err)
 | |
| 
 | |
| 	fake, err := r.Consumer(pipeline.NewIDWithName(pipeline.SignalLogs, "fake"))
 | |
| 	assert.Nil(t, fake)
 | |
| 	assert.Error(t, err)
 | |
| }
 |