opentelemetry-collector/processor/memorylimiterprocessor/internal/mock_receiver.go

62 lines
1.5 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal"
import (
"context"
"strings"
"sync"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/plog"
)
type MockReceiver struct {
ProduceCount int
NextConsumer consumer.Logs
lastConsumeResult error
mux sync.Mutex
}
func (m *MockReceiver) Start() {
go m.produce()
}
// This function demonstrates how the receivers should behave when the ConsumeLogs/Traces/Metrics
// call returns an error.
func (m *MockReceiver) produce() {
for i := 0; i < m.ProduceCount; i++ {
// Create a large log to consume some memory.
ld := plog.NewLogs()
lr := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
kiloStr := strings.Repeat("x", 10*1024)
lr.SetSeverityText(kiloStr)
retry:
// Send to the pipeline.
err := m.NextConsumer.ConsumeLogs(context.Background(), ld)
// Remember the result to be used in the tests.
m.mux.Lock()
m.lastConsumeResult = err
m.mux.Unlock()
if err != nil {
// Sending to the pipeline failed.
if !consumererror.IsPermanent(err) {
// Retryable error. Try the same data again.
goto retry
}
// Permanent error. Drop it.
}
}
}
func (m *MockReceiver) LastConsumeResult() error {
m.mux.Lock()
defer m.mux.Unlock()
return m.lastConsumeResult
}