Add context handling to sinks in consumertest (#13048)
#### Description This PR enhances the `consumertest.Sink` types (TracesSink, MetricsSink, LogsSink, ProfilesSink) to store and expose request contexts during consumption Changes include: - Added context storage to all sink types - Added `AllContexts()` method to retrieve stored contexts - Updated `Reset()` to clear stored contexts - Added comprehensive tests for context transformation verification #### Link to tracking issue Fixes [#13039](https://github.com/open-telemetry/opentelemetry-collector/issues/13039) #### Testing Added new test cases to verify context handling: - `TestSinkContextTransformation`: Verifies context preservation across all sink types - `TestContextTransformationChain`: Tests complex chains of context transformations - `TestConcurrentContextTransformations`: Ensures thread-safe context handling - Added context verification to existing sink tests #### Documentation
This commit is contained in:
parent
e6c05b8bab
commit
2527d36934
|
|
@ -0,0 +1,25 @@
|
||||||
|
# Use this changelog template to create an entry for release notes.
|
||||||
|
|
||||||
|
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
|
||||||
|
change_type: enhancement
|
||||||
|
|
||||||
|
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
|
||||||
|
component: consumer/consumertest
|
||||||
|
|
||||||
|
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
|
||||||
|
note: Add context to sinks
|
||||||
|
|
||||||
|
# One or more tracking issues or pull requests related to the change
|
||||||
|
issues: [13039]
|
||||||
|
|
||||||
|
# (Optional) One or more lines of additional information to render under the primary note.
|
||||||
|
# These lines will be padded with 2 spaces and then inserted directly into the document.
|
||||||
|
# Use pipe (|) for multiline entries.
|
||||||
|
subtext:
|
||||||
|
|
||||||
|
# Optional: The change log or logs in which this entry should be included.
|
||||||
|
# e.g. '[user]' or '[user, api]'
|
||||||
|
# Include 'user' if the change is relevant to end users.
|
||||||
|
# Include 'api' if there is a change to a library API.
|
||||||
|
# Default: '[user]'
|
||||||
|
change_logs: [api]
|
||||||
|
|
@ -21,17 +21,19 @@ type TracesSink struct {
|
||||||
nonMutatingConsumer
|
nonMutatingConsumer
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
traces []ptrace.Traces
|
traces []ptrace.Traces
|
||||||
|
contexts []context.Context
|
||||||
spanCount int
|
spanCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ consumer.Traces = (*TracesSink)(nil)
|
var _ consumer.Traces = (*TracesSink)(nil)
|
||||||
|
|
||||||
// ConsumeTraces stores traces to this sink.
|
// ConsumeTraces stores traces to this sink.
|
||||||
func (ste *TracesSink) ConsumeTraces(_ context.Context, td ptrace.Traces) error {
|
func (ste *TracesSink) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
||||||
ste.mu.Lock()
|
ste.mu.Lock()
|
||||||
defer ste.mu.Unlock()
|
defer ste.mu.Unlock()
|
||||||
|
|
||||||
ste.traces = append(ste.traces, td)
|
ste.traces = append(ste.traces, td)
|
||||||
|
ste.contexts = append(ste.contexts, ctx)
|
||||||
ste.spanCount += td.SpanCount()
|
ste.spanCount += td.SpanCount()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -47,6 +49,16 @@ func (ste *TracesSink) AllTraces() []ptrace.Traces {
|
||||||
return copyTraces
|
return copyTraces
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Contexts returns the contexts stored by this sink since last Reset.
|
||||||
|
func (ste *TracesSink) Contexts() []context.Context {
|
||||||
|
ste.mu.Lock()
|
||||||
|
defer ste.mu.Unlock()
|
||||||
|
|
||||||
|
copyContexts := make([]context.Context, len(ste.contexts))
|
||||||
|
copy(copyContexts, ste.contexts)
|
||||||
|
return copyContexts
|
||||||
|
}
|
||||||
|
|
||||||
// SpanCount returns the number of spans sent to this sink.
|
// SpanCount returns the number of spans sent to this sink.
|
||||||
func (ste *TracesSink) SpanCount() int {
|
func (ste *TracesSink) SpanCount() int {
|
||||||
ste.mu.Lock()
|
ste.mu.Lock()
|
||||||
|
|
@ -60,6 +72,7 @@ func (ste *TracesSink) Reset() {
|
||||||
defer ste.mu.Unlock()
|
defer ste.mu.Unlock()
|
||||||
|
|
||||||
ste.traces = nil
|
ste.traces = nil
|
||||||
|
ste.contexts = nil
|
||||||
ste.spanCount = 0
|
ste.spanCount = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -69,17 +82,19 @@ type MetricsSink struct {
|
||||||
nonMutatingConsumer
|
nonMutatingConsumer
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
metrics []pmetric.Metrics
|
metrics []pmetric.Metrics
|
||||||
|
contexts []context.Context
|
||||||
dataPointCount int
|
dataPointCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ consumer.Metrics = (*MetricsSink)(nil)
|
var _ consumer.Metrics = (*MetricsSink)(nil)
|
||||||
|
|
||||||
// ConsumeMetrics stores metrics to this sink.
|
// ConsumeMetrics stores metrics to this sink.
|
||||||
func (sme *MetricsSink) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error {
|
func (sme *MetricsSink) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
|
||||||
sme.mu.Lock()
|
sme.mu.Lock()
|
||||||
defer sme.mu.Unlock()
|
defer sme.mu.Unlock()
|
||||||
|
|
||||||
sme.metrics = append(sme.metrics, md)
|
sme.metrics = append(sme.metrics, md)
|
||||||
|
sme.contexts = append(sme.contexts, ctx)
|
||||||
sme.dataPointCount += md.DataPointCount()
|
sme.dataPointCount += md.DataPointCount()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -95,6 +110,16 @@ func (sme *MetricsSink) AllMetrics() []pmetric.Metrics {
|
||||||
return copyMetrics
|
return copyMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Contexts returns the contexts stored by this sink since last Reset.
|
||||||
|
func (sme *MetricsSink) Contexts() []context.Context {
|
||||||
|
sme.mu.Lock()
|
||||||
|
defer sme.mu.Unlock()
|
||||||
|
|
||||||
|
copyContexts := make([]context.Context, len(sme.contexts))
|
||||||
|
copy(copyContexts, sme.contexts)
|
||||||
|
return copyContexts
|
||||||
|
}
|
||||||
|
|
||||||
// DataPointCount returns the number of metrics stored by this sink since last Reset.
|
// DataPointCount returns the number of metrics stored by this sink since last Reset.
|
||||||
func (sme *MetricsSink) DataPointCount() int {
|
func (sme *MetricsSink) DataPointCount() int {
|
||||||
sme.mu.Lock()
|
sme.mu.Lock()
|
||||||
|
|
@ -108,6 +133,7 @@ func (sme *MetricsSink) Reset() {
|
||||||
defer sme.mu.Unlock()
|
defer sme.mu.Unlock()
|
||||||
|
|
||||||
sme.metrics = nil
|
sme.metrics = nil
|
||||||
|
sme.contexts = nil
|
||||||
sme.dataPointCount = 0
|
sme.dataPointCount = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -117,19 +143,20 @@ type LogsSink struct {
|
||||||
nonMutatingConsumer
|
nonMutatingConsumer
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
logs []plog.Logs
|
logs []plog.Logs
|
||||||
|
contexts []context.Context
|
||||||
logRecordCount int
|
logRecordCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ consumer.Logs = (*LogsSink)(nil)
|
var _ consumer.Logs = (*LogsSink)(nil)
|
||||||
|
|
||||||
// ConsumeLogs stores logs to this sink.
|
// ConsumeLogs stores logs to this sink.
|
||||||
func (sle *LogsSink) ConsumeLogs(_ context.Context, ld plog.Logs) error {
|
func (sle *LogsSink) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
|
||||||
sle.mu.Lock()
|
sle.mu.Lock()
|
||||||
defer sle.mu.Unlock()
|
defer sle.mu.Unlock()
|
||||||
|
|
||||||
sle.logs = append(sle.logs, ld)
|
sle.logs = append(sle.logs, ld)
|
||||||
sle.logRecordCount += ld.LogRecordCount()
|
sle.logRecordCount += ld.LogRecordCount()
|
||||||
|
sle.contexts = append(sle.contexts, ctx)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -156,26 +183,39 @@ func (sle *LogsSink) Reset() {
|
||||||
defer sle.mu.Unlock()
|
defer sle.mu.Unlock()
|
||||||
|
|
||||||
sle.logs = nil
|
sle.logs = nil
|
||||||
|
sle.contexts = nil
|
||||||
sle.logRecordCount = 0
|
sle.logRecordCount = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Contexts returns the contexts stored by this sink since last Reset.
|
||||||
|
func (sle *LogsSink) Contexts() []context.Context {
|
||||||
|
sle.mu.Lock()
|
||||||
|
defer sle.mu.Unlock()
|
||||||
|
|
||||||
|
copyContexts := make([]context.Context, len(sle.contexts))
|
||||||
|
copy(copyContexts, sle.contexts)
|
||||||
|
return copyContexts
|
||||||
|
}
|
||||||
|
|
||||||
// ProfilesSink is a xconsumer.Profiles that acts like a sink that
|
// ProfilesSink is a xconsumer.Profiles that acts like a sink that
|
||||||
// stores all profiles and allows querying them for testing.
|
// stores all profiles and allows querying them for testing.
|
||||||
type ProfilesSink struct {
|
type ProfilesSink struct {
|
||||||
nonMutatingConsumer
|
nonMutatingConsumer
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
profiles []pprofile.Profiles
|
profiles []pprofile.Profiles
|
||||||
|
contexts []context.Context
|
||||||
sampleCount int
|
sampleCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ xconsumer.Profiles = (*ProfilesSink)(nil)
|
var _ xconsumer.Profiles = (*ProfilesSink)(nil)
|
||||||
|
|
||||||
// ConsumeProfiles stores profiles to this sink.
|
// ConsumeProfiles stores profiles to this sink.
|
||||||
func (ste *ProfilesSink) ConsumeProfiles(_ context.Context, td pprofile.Profiles) error {
|
func (ste *ProfilesSink) ConsumeProfiles(ctx context.Context, td pprofile.Profiles) error {
|
||||||
ste.mu.Lock()
|
ste.mu.Lock()
|
||||||
defer ste.mu.Unlock()
|
defer ste.mu.Unlock()
|
||||||
|
|
||||||
ste.profiles = append(ste.profiles, td)
|
ste.profiles = append(ste.profiles, td)
|
||||||
|
ste.contexts = append(ste.contexts, ctx)
|
||||||
ste.sampleCount += td.SampleCount()
|
ste.sampleCount += td.SampleCount()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -204,5 +244,16 @@ func (ste *ProfilesSink) Reset() {
|
||||||
defer ste.mu.Unlock()
|
defer ste.mu.Unlock()
|
||||||
|
|
||||||
ste.profiles = nil
|
ste.profiles = nil
|
||||||
|
ste.contexts = nil
|
||||||
ste.sampleCount = 0
|
ste.sampleCount = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Contexts returns the contexts stored by this sink since last Reset.
|
||||||
|
func (ste *ProfilesSink) Contexts() []context.Context {
|
||||||
|
ste.mu.Lock()
|
||||||
|
defer ste.mu.Unlock()
|
||||||
|
|
||||||
|
copyContexts := make([]context.Context, len(ste.contexts))
|
||||||
|
copy(copyContexts, ste.contexts)
|
||||||
|
return copyContexts
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@ package consumertest
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
@ -17,6 +19,11 @@ import (
|
||||||
"go.opentelemetry.io/collector/pdata/testdata"
|
"go.opentelemetry.io/collector/pdata/testdata"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
ctxKey string
|
||||||
|
testKey int
|
||||||
|
)
|
||||||
|
|
||||||
func TestTracesSink(t *testing.T) {
|
func TestTracesSink(t *testing.T) {
|
||||||
sink := new(TracesSink)
|
sink := new(TracesSink)
|
||||||
td := testdata.GenerateTraces(1)
|
td := testdata.GenerateTraces(1)
|
||||||
|
|
@ -76,3 +83,263 @@ func TestProfilesSink(t *testing.T) {
|
||||||
assert.Empty(t, sink.AllProfiles())
|
assert.Empty(t, sink.AllProfiles())
|
||||||
assert.Empty(t, sink.SampleCount())
|
assert.Empty(t, sink.SampleCount())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTracesSinkWithContext(t *testing.T) {
|
||||||
|
sink := new(TracesSink)
|
||||||
|
td := testdata.GenerateTraces(1)
|
||||||
|
want := make([]ptrace.Traces, 0, 7)
|
||||||
|
wantCtx := make([]context.Context, 0, 7)
|
||||||
|
|
||||||
|
for i := 0; i < 7; i++ {
|
||||||
|
ctx := context.WithValue(context.Background(), testKey(i), fmt.Sprintf("value-%d", i))
|
||||||
|
require.NoError(t, sink.ConsumeTraces(ctx, td))
|
||||||
|
want = append(want, td)
|
||||||
|
wantCtx = append(wantCtx, ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, want, sink.AllTraces())
|
||||||
|
assert.Equal(t, len(want), sink.SpanCount())
|
||||||
|
|
||||||
|
// Verify contexts
|
||||||
|
gotCtx := sink.Contexts()
|
||||||
|
assert.Len(t, gotCtx, len(wantCtx))
|
||||||
|
for i, ctx := range gotCtx {
|
||||||
|
assert.Equal(t, fmt.Sprintf("value-%d", i), ctx.Value(testKey(i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
sink.Reset()
|
||||||
|
assert.Empty(t, sink.AllTraces())
|
||||||
|
assert.Empty(t, sink.Contexts())
|
||||||
|
assert.Equal(t, 0, sink.SpanCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsSinkWithContext(t *testing.T) {
|
||||||
|
sink := new(MetricsSink)
|
||||||
|
md := testdata.GenerateMetrics(1)
|
||||||
|
want := make([]pmetric.Metrics, 0, 7)
|
||||||
|
wantCtx := make([]context.Context, 0, 7)
|
||||||
|
|
||||||
|
for i := 0; i < 7; i++ {
|
||||||
|
ctx := context.WithValue(context.Background(), testKey(i), fmt.Sprintf("value-%d", i))
|
||||||
|
require.NoError(t, sink.ConsumeMetrics(ctx, md))
|
||||||
|
want = append(want, md)
|
||||||
|
wantCtx = append(wantCtx, ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, want, sink.AllMetrics())
|
||||||
|
assert.Equal(t, 2*len(want), sink.DataPointCount())
|
||||||
|
|
||||||
|
// Verify contexts
|
||||||
|
gotCtx := sink.Contexts()
|
||||||
|
assert.Len(t, gotCtx, len(wantCtx))
|
||||||
|
for i, ctx := range gotCtx {
|
||||||
|
assert.Equal(t, fmt.Sprintf("value-%d", i), ctx.Value(testKey(i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
sink.Reset()
|
||||||
|
assert.Empty(t, sink.AllMetrics())
|
||||||
|
assert.Empty(t, sink.Contexts())
|
||||||
|
assert.Equal(t, 0, sink.DataPointCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLogsSinkWithContext(t *testing.T) {
|
||||||
|
sink := new(LogsSink)
|
||||||
|
md := testdata.GenerateLogs(1)
|
||||||
|
want := make([]plog.Logs, 0, 7)
|
||||||
|
wantCtx := make([]context.Context, 0, 7)
|
||||||
|
|
||||||
|
for i := 0; i < 7; i++ {
|
||||||
|
ctx := context.WithValue(context.Background(), testKey(i), fmt.Sprintf("value-%d", i))
|
||||||
|
require.NoError(t, sink.ConsumeLogs(ctx, md))
|
||||||
|
want = append(want, md)
|
||||||
|
wantCtx = append(wantCtx, ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, want, sink.AllLogs())
|
||||||
|
assert.Equal(t, len(want), sink.LogRecordCount())
|
||||||
|
|
||||||
|
// Verify contexts
|
||||||
|
gotCtx := sink.Contexts()
|
||||||
|
assert.Len(t, gotCtx, len(wantCtx))
|
||||||
|
for i, ctx := range gotCtx {
|
||||||
|
assert.Equal(t, fmt.Sprintf("value-%d", i), ctx.Value(testKey(i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
sink.Reset()
|
||||||
|
assert.Empty(t, sink.AllLogs())
|
||||||
|
assert.Empty(t, sink.Contexts())
|
||||||
|
assert.Equal(t, 0, sink.LogRecordCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProfilesSinkWithContext(t *testing.T) {
|
||||||
|
sink := new(ProfilesSink)
|
||||||
|
td := testdata.GenerateProfiles(1)
|
||||||
|
want := make([]pprofile.Profiles, 0, 7)
|
||||||
|
wantCtx := make([]context.Context, 0, 7)
|
||||||
|
|
||||||
|
for i := 0; i < 7; i++ {
|
||||||
|
ctx := context.WithValue(context.Background(), testKey(i), fmt.Sprintf("value-%d", i))
|
||||||
|
require.NoError(t, sink.ConsumeProfiles(ctx, td))
|
||||||
|
want = append(want, td)
|
||||||
|
wantCtx = append(wantCtx, ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, want, sink.AllProfiles())
|
||||||
|
assert.Equal(t, len(want), sink.SampleCount())
|
||||||
|
|
||||||
|
// Verify contexts
|
||||||
|
gotCtx := sink.Contexts()
|
||||||
|
assert.Len(t, gotCtx, len(wantCtx))
|
||||||
|
for i, ctx := range gotCtx {
|
||||||
|
assert.Equal(t, fmt.Sprintf("value-%d", i), ctx.Value(testKey(i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
sink.Reset()
|
||||||
|
assert.Empty(t, sink.AllProfiles())
|
||||||
|
assert.Empty(t, sink.Contexts())
|
||||||
|
assert.Equal(t, 0, sink.SampleCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSinkContextTransformation verifies that the context is stored and transformed correctly
|
||||||
|
func TestSinkContextTransformation(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
sink interface {
|
||||||
|
Contexts() []context.Context
|
||||||
|
}
|
||||||
|
consumeFunc func(any, context.Context) error
|
||||||
|
testData any
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "TracesSink",
|
||||||
|
sink: new(TracesSink),
|
||||||
|
consumeFunc: func(sink any, ctx context.Context) error {
|
||||||
|
return sink.(*TracesSink).ConsumeTraces(ctx, testdata.GenerateTraces(1))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "MetricsSink",
|
||||||
|
sink: new(MetricsSink),
|
||||||
|
consumeFunc: func(sink any, ctx context.Context) error {
|
||||||
|
return sink.(*MetricsSink).ConsumeMetrics(ctx, testdata.GenerateMetrics(1))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "LogsSink",
|
||||||
|
sink: new(LogsSink),
|
||||||
|
consumeFunc: func(sink any, ctx context.Context) error {
|
||||||
|
return sink.(*LogsSink).ConsumeLogs(ctx, testdata.GenerateLogs(1))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ProfilesSink",
|
||||||
|
sink: new(ProfilesSink),
|
||||||
|
consumeFunc: func(sink any, ctx context.Context) error {
|
||||||
|
return sink.(*ProfilesSink).ConsumeProfiles(ctx, testdata.GenerateProfiles(1))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
// Create a context with initial values
|
||||||
|
initialCtx := context.WithValue(context.Background(), ctxKey("initial-key"), "initial-value")
|
||||||
|
|
||||||
|
// Create a context chain to simulate transformation
|
||||||
|
transformedCtx := context.WithValue(initialCtx, ctxKey("transformed-key"), "transformed-value")
|
||||||
|
|
||||||
|
// Consume data with the transformed context
|
||||||
|
err := tc.consumeFunc(tc.sink, transformedCtx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify context storage and transformation
|
||||||
|
storedContexts := tc.sink.Contexts()
|
||||||
|
assert.Len(t, storedContexts, 1, "Should have stored exactly one context")
|
||||||
|
|
||||||
|
storedCtx := storedContexts[0]
|
||||||
|
// Verify both initial and transformed values are preserved
|
||||||
|
assert.Equal(t, "initial-value", storedCtx.Value(ctxKey("initial-key")),
|
||||||
|
"Initial context value should be preserved")
|
||||||
|
assert.Equal(t, "transformed-value", storedCtx.Value(ctxKey("transformed-key")),
|
||||||
|
"Transformed context value should be stored")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestContextTransformationChain verifies that the context is stored and transformed correctly in a chain of transformations
|
||||||
|
func TestContextTransformationChain(t *testing.T) {
|
||||||
|
sink := new(TracesSink)
|
||||||
|
|
||||||
|
// Create a context transformation chain
|
||||||
|
baseCtx := context.Background()
|
||||||
|
ctx1 := context.WithValue(baseCtx, ctxKey("step1"), "value1")
|
||||||
|
ctx2 := context.WithValue(ctx1, ctxKey("step2"), "value2")
|
||||||
|
ctx3 := context.WithValue(ctx2, ctxKey("step3"), "value3")
|
||||||
|
|
||||||
|
// Consume traces with the transformed context
|
||||||
|
td := testdata.GenerateTraces(1)
|
||||||
|
err := sink.ConsumeTraces(ctx3, td)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify the complete transformation chain
|
||||||
|
storedContexts := sink.Contexts()
|
||||||
|
require.Len(t, storedContexts, 1)
|
||||||
|
|
||||||
|
finalCtx := storedContexts[0]
|
||||||
|
// Verify each transformation step
|
||||||
|
assert.Equal(t, "value1", finalCtx.Value(ctxKey("step1")), "First transformation should be preserved")
|
||||||
|
assert.Equal(t, "value2", finalCtx.Value(ctxKey("step2")), "Second transformation should be preserved")
|
||||||
|
assert.Equal(t, "value3", finalCtx.Value(ctxKey("step3")), "Third transformation should be preserved")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestConcurrentContextTransformations verifies context handling under concurrent operations
|
||||||
|
func TestConcurrentContextTransformations(t *testing.T) {
|
||||||
|
sink := new(TracesSink)
|
||||||
|
const numGoroutines = 10
|
||||||
|
errChan := make(chan error, numGoroutines)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(numGoroutines)
|
||||||
|
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
go func(idx int) {
|
||||||
|
defer wg.Done()
|
||||||
|
key := ctxKey(fmt.Sprintf("goroutine-%d", idx))
|
||||||
|
value := fmt.Sprintf("value-%d", idx)
|
||||||
|
ctx := context.WithValue(context.Background(), key, value)
|
||||||
|
|
||||||
|
td := testdata.GenerateTraces(1)
|
||||||
|
if err := sink.ConsumeTraces(ctx, td); err != nil {
|
||||||
|
errChan <- err
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
close(errChan)
|
||||||
|
|
||||||
|
// Check for any errors that occurred in goroutines
|
||||||
|
for err := range errChan {
|
||||||
|
t.Errorf("Error in goroutine: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify all contexts were stored correctly
|
||||||
|
storedContexts := sink.Contexts()
|
||||||
|
assert.Len(t, storedContexts, numGoroutines)
|
||||||
|
|
||||||
|
// Create a map to verify all expected values are present
|
||||||
|
contextValues := make(map[string]bool)
|
||||||
|
for _, ctx := range storedContexts {
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
key := ctxKey(fmt.Sprintf("goroutine-%d", i))
|
||||||
|
expectedValue := fmt.Sprintf("value-%d", i)
|
||||||
|
if val := ctx.Value(key); val == expectedValue {
|
||||||
|
contextValues[fmt.Sprintf("goroutine-%d", i)] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify all goroutines' contexts were preserved
|
||||||
|
assert.Len(t, contextValues, numGoroutines,
|
||||||
|
"Should have stored contexts from all goroutines")
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue