opentelemetry-collector/receiver/receivertest/contract_checker.go

564 lines
18 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package receivertest // import "go.opentelemetry.io/collector/receiver/receivertest"
import (
"context"
"errors"
"fmt"
"math/rand/v2"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/receiver"
)
// UniqueIDAttrName is the attribute name that is used in log records/spans/datapoints as the unique identifier.
const UniqueIDAttrName = "test_id"
// UniqueIDAttrVal is the value type of the UniqueIDAttrName.
type UniqueIDAttrVal string
type Generator interface {
// Start the generator and prepare to generate. Will be followed by calls to Generate().
// Start() may be called again after Stop() is called to begin a new test scenario.
Start()
// Stop generating. There will be no more calls to Generate() until Start() is called again.
Stop()
// Generate must generate and send at least one data element (span, log record or metric data point)
// to the receiver and return a copy of generated element ids.
// The generated data must contain uniquely identifiable elements, each with a
// different value of attribute named UniqueIDAttrName.
// CreateOneLogWithID() can be used a helper to create such logs.
// May be called concurrently from multiple goroutines.
Generate() []UniqueIDAttrVal
}
type CheckConsumeContractParams struct {
T *testing.T
// Factory that allows to create a receiver.
Factory receiver.Factory
Signal pipeline.Signal
// Config of the receiver to use.
Config component.Config
// Generator that can send data to the receiver.
Generator Generator
// GenerateCount specifies the number of times to call the generator.Generate()
// for each test scenario.
GenerateCount int
}
// CheckConsumeContract checks the contract between the receiver and its next consumer. For the contract
// description see ../doc.go. The checker will detect violations of contract on different scenarios: on success,
// on permanent and non-permanent errors and mix of error types.
func CheckConsumeContract(params CheckConsumeContractParams) {
// Different scenarios to test for.
// The decision function defines the testing scenario (i.e. to test for
// success case or for error case or a mix of both). See for example randomErrorsConsumeDecision.
scenarios := []struct {
name string
decisionFunc func(ids idSet) error
}{
{
name: "always_succeed",
// Always succeed. We expect all data to be delivered as is.
decisionFunc: func(idSet) error { return nil },
},
{
name: "random_non_permanent_error",
decisionFunc: randomNonPermanentErrorConsumeDecision,
},
{
name: "random_permanent_error",
decisionFunc: randomPermanentErrorConsumeDecision,
},
{
name: "random_error",
decisionFunc: randomErrorsConsumeDecision,
},
}
for _, scenario := range scenarios {
params.T.Run(
scenario.name, func(*testing.T) {
checkConsumeContractScenario(params, scenario.decisionFunc)
},
)
}
}
func checkConsumeContractScenario(params CheckConsumeContractParams, decisionFunc func(ids idSet) error) {
consumer := &mockConsumer{t: params.T, consumeDecisionFunc: decisionFunc, acceptedIDs: make(idSet), droppedIDs: make(idSet)}
ctx := context.Background()
// Create and start the receiver.
var receiver component.Component
var err error
switch params.Signal {
case pipeline.SignalLogs:
receiver, err = params.Factory.CreateLogs(ctx, NewNopSettings(), params.Config, consumer)
case pipeline.SignalTraces:
receiver, err = params.Factory.CreateTraces(ctx, NewNopSettings(), params.Config, consumer)
case pipeline.SignalMetrics:
receiver, err = params.Factory.CreateMetrics(ctx, NewNopSettings(), params.Config, consumer)
default:
require.FailNow(params.T, "must specify a valid DataType to test for")
}
require.NoError(params.T, err)
err = receiver.Start(ctx, componenttest.NewNopHost())
require.NoError(params.T, err)
// Begin generating data to the receiver.
generatedIDs := make(idSet)
var generatedIndex int64
var mux sync.Mutex
var wg sync.WaitGroup
const concurrency = 4
params.Generator.Start()
defer params.Generator.Stop()
// Create concurrent goroutines that use the generator.
// The total number of generator calls will be equal to params.GenerateCount.
for j := 0; j < concurrency; j++ {
wg.Add(1)
go func() {
defer wg.Done()
for atomic.AddInt64(&generatedIndex, 1) <= int64(params.GenerateCount) {
ids := params.Generator.Generate()
require.NotEmpty(params.T, ids)
mux.Lock()
duplicates := generatedIDs.mergeSlice(ids)
mux.Unlock()
// Check that the generator works correctly. There may not be any duplicates in the
// generated data set.
require.Empty(params.T, duplicates)
}
}()
}
// Wait until all generator goroutines are done.
wg.Wait()
// Wait until all data is seen by the consumer.
assert.Eventually(params.T, func() bool {
// Calculate the union of accepted and dropped data.
acceptedAndDropped, duplicates := consumer.acceptedAndDropped()
if len(duplicates) != 0 {
assert.Failf(params.T, "found duplicate elements in received and dropped data", "keys=%v", duplicates)
}
// Compare accepted+dropped with generated. Once they are equal it means all data is seen by the consumer.
missingInOther, onlyInOther := generatedIDs.compare(acceptedAndDropped)
return len(missingInOther) == 0 && len(onlyInOther) == 0
}, 5*time.Second, 10*time.Millisecond)
// Do some final checks. Need the union of accepted and dropped data again.
acceptedAndDropped, duplicates := consumer.acceptedAndDropped()
if len(duplicates) != 0 {
assert.Failf(params.T, "found duplicate elements in accepted and dropped data", "keys=%v", duplicates)
}
// Make sure generated and accepted+dropped are exactly the same.
missingInOther, onlyInOther := generatedIDs.compare(acceptedAndDropped)
if len(missingInOther) != 0 {
assert.Failf(params.T, "found elements sent that were not delivered", "keys=%v", missingInOther)
}
if len(onlyInOther) != 0 {
assert.Failf(params.T, "found elements in accepted and dropped data that was never sent", "keys=%v", onlyInOther)
}
err = receiver.Shutdown(ctx)
require.NoError(params.T, err)
// Print some stats to help debug test failures.
fmt.Printf(
"Sent %d, accepted=%d, expected dropped=%d, non-permanent errors retried=%d\n",
len(generatedIDs),
len(consumer.acceptedIDs),
len(consumer.droppedIDs),
consumer.nonPermanentFailures,
)
}
// idSet is a set of unique ids of data elements used in the test (logs, spans or metric data points).
type idSet map[UniqueIDAttrVal]bool
// compare to another set and calculate the differences from this set.
func (ds idSet) compare(other idSet) (missingInOther, onlyInOther []UniqueIDAttrVal) {
for k := range ds {
if _, ok := other[k]; !ok {
missingInOther = append(missingInOther, k)
}
}
for k := range other {
if _, ok := ds[k]; !ok {
onlyInOther = append(onlyInOther, k)
}
}
return
}
// merge another set into this one and return a list of duplicate ids.
func (ds idSet) merge(other idSet) (duplicates []UniqueIDAttrVal) {
for k, v := range other {
if _, ok := ds[k]; ok {
duplicates = append(duplicates, k)
} else {
ds[k] = v
}
}
return
}
// mergeSlice merges another set into this one and return a list of duplicate ids.
func (ds idSet) mergeSlice(other []UniqueIDAttrVal) (duplicates []UniqueIDAttrVal) {
for _, id := range other {
if _, ok := ds[id]; ok {
duplicates = append(duplicates, id)
} else {
ds[id] = true
}
}
return
}
// union computes the union of this and another sets. A new set if created to return the result.
// Also returns a list of any duplicate ids found.
func (ds idSet) union(other idSet) (union idSet, duplicates []UniqueIDAttrVal) {
union = map[UniqueIDAttrVal]bool{}
for k, v := range ds {
union[k] = v
}
for k, v := range other {
if _, ok := union[k]; ok {
duplicates = append(duplicates, k)
} else {
union[k] = v
}
}
return
}
// A function that returns a value indicating what the receiver's next consumer decides
// to do as a result of ConsumeLogs/Trace/Metrics call.
// The result of the decision function becomes the return value of ConsumeLogs/Trace/Metrics.
// Supplying different decision functions allows to test different scenarios of the contract
// between the receiver and it next consumer.
type consumeDecisionFunc func(ids idSet) error
var (
errNonPermanent = errors.New("non permanent error")
errPermanent = errors.New("permanent error")
)
// randomNonPermanentErrorConsumeDecision is a decision function that succeeds approximately
// half of the time and fails with a non-permanent error the rest of the time.
func randomNonPermanentErrorConsumeDecision(idSet) error {
if rand.Float32() < 0.5 {
return errNonPermanent
}
return nil
}
// randomPermanentErrorConsumeDecision is a decision function that succeeds approximately
// half of the time and fails with a permanent error the rest of the time.
func randomPermanentErrorConsumeDecision(idSet) error {
if rand.Float32() < 0.5 {
return consumererror.NewPermanent(errPermanent)
}
return nil
}
// randomErrorsConsumeDecision is a decision function that succeeds approximately
// a third of the time, fails with a permanent error the third of the time and fails with
// a non-permanent error the rest of the time.
func randomErrorsConsumeDecision(idSet) error {
r := rand.Float64()
third := 1.0 / 3.0
if r < third {
return consumererror.NewPermanent(errPermanent)
}
if r < 2*third {
return errNonPermanent
}
return nil
}
// mockConsumer accepts or drops the data from the receiver based on the decision made by
// consumeDecisionFunc and remembers the accepted and dropped data sets for later checks.
// mockConsumer implements all 3 consume functions: ConsumeLogs/ConsumeTraces/ConsumeMetrics
// and can be used for testing any of the 3 signals.
type mockConsumer struct {
t *testing.T
consumeDecisionFunc consumeDecisionFunc
mux sync.Mutex
acceptedIDs idSet
droppedIDs idSet
nonPermanentFailures int
}
func (m *mockConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}
func (m *mockConsumer) ConsumeTraces(_ context.Context, data ptrace.Traces) error {
ids, err := idSetFromTraces(data)
require.NoError(m.t, err)
return m.consume(ids)
}
// idSetFromTraces computes an idSet from given ptrace.Traces. The idSet will contain ids of all spans.
func idSetFromTraces(data ptrace.Traces) (idSet, error) {
ds := map[UniqueIDAttrVal]bool{}
rss := data.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
ils := rss.At(i).ScopeSpans()
for j := 0; j < ils.Len(); j++ {
ss := ils.At(j).Spans()
for k := 0; k < ss.Len(); k++ {
elem := ss.At(k)
key, exists := elem.Attributes().Get(UniqueIDAttrName)
if !exists {
return ds, fmt.Errorf("invalid data element, attribute %q is missing", UniqueIDAttrName)
}
if key.Type() != pcommon.ValueTypeStr {
return ds, fmt.Errorf("invalid data element, attribute %q is wrong type %v", UniqueIDAttrName, key.Type())
}
ds[UniqueIDAttrVal(key.Str())] = true
}
}
}
return ds, nil
}
func (m *mockConsumer) ConsumeLogs(_ context.Context, data plog.Logs) error {
ids, err := idSetFromLogs(data)
require.NoError(m.t, err)
return m.consume(ids)
}
// idSetFromLogs computes an idSet from given plog.Logs. The idSet will contain ids of all log records.
func idSetFromLogs(data plog.Logs) (idSet, error) {
ds := map[UniqueIDAttrVal]bool{}
rss := data.ResourceLogs()
for i := 0; i < rss.Len(); i++ {
ils := rss.At(i).ScopeLogs()
for j := 0; j < ils.Len(); j++ {
ss := ils.At(j).LogRecords()
for k := 0; k < ss.Len(); k++ {
elem := ss.At(k)
key, exists := elem.Attributes().Get(UniqueIDAttrName)
if !exists {
return ds, fmt.Errorf("invalid data element, attribute %q is missing", UniqueIDAttrName)
}
if key.Type() != pcommon.ValueTypeStr {
return ds, fmt.Errorf("invalid data element, attribute %q is wrong type %v", UniqueIDAttrName, key.Type())
}
ds[UniqueIDAttrVal(key.Str())] = true
}
}
}
return ds, nil
}
func (m *mockConsumer) ConsumeMetrics(_ context.Context, data pmetric.Metrics) error {
ids, err := idSetFromMetrics(data)
require.NoError(m.t, err)
return m.consume(ids)
}
// idSetFromMetrics computes an idSet from given pmetric.Metrics. The idSet will contain ids of all metric data points.
func idSetFromMetrics(data pmetric.Metrics) (idSet, error) {
ds := map[UniqueIDAttrVal]bool{}
rss := data.ResourceMetrics()
for i := 0; i < rss.Len(); i++ {
ils := rss.At(i).ScopeMetrics()
for j := 0; j < ils.Len(); j++ {
ss := ils.At(j).Metrics()
for k := 0; k < ss.Len(); k++ {
elem := ss.At(k)
switch elem.Type() {
case pmetric.MetricTypeGauge:
for l := 0; l < elem.Gauge().DataPoints().Len(); l++ {
dp := elem.Gauge().DataPoints().At(l)
if err := idSetFromDataPoint(ds, dp.Attributes()); err != nil {
return ds, err
}
}
case pmetric.MetricTypeSum:
for l := 0; l < elem.Sum().DataPoints().Len(); l++ {
dp := elem.Sum().DataPoints().At(l)
if err := idSetFromDataPoint(ds, dp.Attributes()); err != nil {
return ds, err
}
}
case pmetric.MetricTypeSummary:
for l := 0; l < elem.Summary().DataPoints().Len(); l++ {
dp := elem.Summary().DataPoints().At(l)
if err := idSetFromDataPoint(ds, dp.Attributes()); err != nil {
return ds, err
}
}
case pmetric.MetricTypeHistogram:
for l := 0; l < elem.Histogram().DataPoints().Len(); l++ {
dp := elem.Histogram().DataPoints().At(l)
if err := idSetFromDataPoint(ds, dp.Attributes()); err != nil {
return ds, err
}
}
case pmetric.MetricTypeExponentialHistogram:
for l := 0; l < elem.ExponentialHistogram().DataPoints().Len(); l++ {
dp := elem.ExponentialHistogram().DataPoints().At(l)
if err := idSetFromDataPoint(ds, dp.Attributes()); err != nil {
return ds, err
}
}
}
}
}
}
return ds, nil
}
func idSetFromDataPoint(ds map[UniqueIDAttrVal]bool, attributes pcommon.Map) error {
key, exists := attributes.Get(UniqueIDAttrName)
if !exists {
return fmt.Errorf("invalid data element, attribute %q is missing", UniqueIDAttrName)
}
if key.Type() != pcommon.ValueTypeStr {
return fmt.Errorf("invalid data element, attribute %q is wrong type %v", UniqueIDAttrName, key.Type())
}
ds[UniqueIDAttrVal(key.Str())] = true
return nil
}
// consume the elements with the specified ids, regardless of the element data type.
func (m *mockConsumer) consume(ids idSet) error {
m.mux.Lock()
defer m.mux.Unlock()
// Consult with user-defined decision function to decide what to do with the data.
if err := m.consumeDecisionFunc(ids); err != nil {
// The decision is to return an error to the receiver.
if consumererror.IsPermanent(err) {
// It is a permanent error, which means we need to drop the data.
// Remember the ids of dropped elements.
duplicates := m.droppedIDs.merge(ids)
require.Empty(m.t, duplicates, "elements that were dropped previously were sent again")
} else {
// It is a non-permanent error. Don't add it to the drop list. Remember the number of
// failures to print at the end of the test.
m.nonPermanentFailures++
}
// Return the error to the receiver.
return err
}
// The decision is a success. Remember the ids of the data in the accepted list.
duplicates := m.acceptedIDs.merge(ids)
require.Empty(m.t, duplicates, "elements that were accepted previously were sent again")
return nil
}
// Calculate union of accepted and dropped ids.
// Returns the union and the list of duplicates between the two sets (if any)
func (m *mockConsumer) acceptedAndDropped() (acceptedAndDropped idSet, duplicates []UniqueIDAttrVal) {
m.mux.Lock()
defer m.mux.Unlock()
return m.acceptedIDs.union(m.droppedIDs)
}
func CreateOneLogWithID(id UniqueIDAttrVal) plog.Logs {
data := plog.NewLogs()
data.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Attributes().PutStr(
UniqueIDAttrName,
string(id),
)
return data
}
func CreateGaugeMetricWithID(id UniqueIDAttrVal) pmetric.Metrics {
data := pmetric.NewMetrics()
gauge := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics()
gauge.AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes().PutStr(
UniqueIDAttrName,
string(id),
)
return data
}
func CreateSumMetricWithID(id UniqueIDAttrVal) pmetric.Metrics {
data := pmetric.NewMetrics()
sum := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics()
sum.AppendEmpty().SetEmptySum().DataPoints().AppendEmpty().Attributes().PutStr(
UniqueIDAttrName,
string(id),
)
return data
}
func CreateSummaryMetricWithID(id UniqueIDAttrVal) pmetric.Metrics {
data := pmetric.NewMetrics()
summary := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics()
summary.AppendEmpty().SetEmptySummary().DataPoints().AppendEmpty().Attributes().PutStr(
UniqueIDAttrName,
string(id),
)
return data
}
func CreateHistogramMetricWithID(id UniqueIDAttrVal) pmetric.Metrics {
data := pmetric.NewMetrics()
histogram := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics()
histogram.AppendEmpty().SetEmptyHistogram().DataPoints().AppendEmpty().Attributes().PutStr(
UniqueIDAttrName,
string(id),
)
return data
}
func CreateExponentialHistogramMetricWithID(id UniqueIDAttrVal) pmetric.Metrics {
data := pmetric.NewMetrics()
exponentialHistogram := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics()
exponentialHistogram.AppendEmpty().SetEmptyExponentialHistogram().DataPoints().AppendEmpty().Attributes().PutStr(
UniqueIDAttrName,
string(id),
)
return data
}
func CreateOneSpanWithID(id UniqueIDAttrVal) ptrace.Traces {
data := ptrace.NewTraces()
data.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().Attributes().PutStr(
UniqueIDAttrName,
string(id),
)
return data
}