opentelemetry-collector/internal/e2e/metric_stability_test.go

367 lines
12 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package e2e
import (
"bufio"
"bytes"
"context"
"fmt"
"net"
"net/http"
"path/filepath"
"strconv"
"testing"
"time"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/envprovider"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/debugexporter"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
)
func assertMetrics(t *testing.T, metricsAddr string, expectedMetrics map[string]bool) bool {
client := &http.Client{}
resp, err := client.Get(fmt.Sprintf("http://%s/metrics", metricsAddr))
if err != nil {
return false
}
if resp.StatusCode != http.StatusOK {
return false
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
var parser expfmt.TextParser
parsed, err := parser.TextToMetricFamilies(reader)
if err != nil {
return false
}
for metricName, metricFamily := range parsed {
if _, ok := expectedMetrics[metricName]; ok {
expectedMetrics[metricName] = true
assert.GreaterOrEqual(t, len(metricFamily.Metric), 1,
"metric %s should have at least one data point", metricName)
}
}
for metricName, found := range expectedMetrics {
if !found {
t.Logf("expected metric %s was not found", metricName)
return false
}
}
return true
}
func TestMetricStability(t *testing.T) {
tests := []struct {
name string
configFile string
expectedMetrics map[string]bool
otelPort string
metricsPort string
}{
{
name: "No metric readers (default)",
configFile: "metric_stability_test_no_readers.yaml",
expectedMetrics: map[string]bool{
// Process metrics
"otelcol_process_uptime": false,
"otelcol_process_cpu_seconds": false,
"otelcol_process_memory_rss": false,
"otelcol_process_runtime_heap_alloc_bytes": false,
"otelcol_process_runtime_total_alloc_bytes": false,
"otelcol_process_runtime_total_sys_memory_bytes": false,
// Batch processor metrics
"otelcol_processor_batch_batch_send_size": false,
"otelcol_processor_batch_batch_send_size_bytes": false,
"otelcol_processor_batch_metadata_cardinality": false,
"otelcol_processor_batch_timeout_trigger_send": false,
// HTTP server metrics
"http_server_request_body_size": false,
"http_server_request_duration": false,
"http_server_response_body_size": false,
// Exporter metrics
"otelcol_exporter_sent_metric_points": false,
"otelcol_exporter_send_failed_metric_points": false,
"otelcol_exporter_sent_spans": false,
"otelcol_exporter_send_failed_spans": false,
"otelcol_exporter_sent_log_records": false,
"otelcol_exporter_send_failed_log_records": false,
// Receiver metrics
"otelcol_receiver_accepted_metric_points": false,
"otelcol_receiver_refused_metric_points": false,
"otelcol_receiver_accepted_spans": false,
"otelcol_receiver_refused_spans": false,
"otelcol_receiver_accepted_log_records": false,
"otelcol_receiver_refused_log_records": false,
// Other metrics
"promhttp_metric_handler_errors_total": false,
"target_info": false,
},
otelPort: getFreePort(t),
metricsPort: "8888", // default metrics port
},
{
name: "Metric readers",
configFile: "metric_stability_test_readers.yaml",
expectedMetrics: map[string]bool{
// Process metrics
"otelcol_process_uptime_seconds_total": false,
"otelcol_process_cpu_seconds_total": false,
"otelcol_process_memory_rss_bytes": false,
"otelcol_process_runtime_heap_alloc_bytes": false,
"otelcol_process_runtime_total_alloc_bytes_total": false,
"otelcol_process_runtime_total_sys_memory_bytes": false,
// Batch processor metrics
"otelcol_processor_batch_batch_send_size": false,
"otelcol_processor_batch_batch_send_size_bytes": false,
"otelcol_processor_batch_metadata_cardinality": false,
"otelcol_processor_batch_timeout_trigger_send_total": false,
// HTTP server metrics
"http_server_request_body_size_bytes": false,
"http_server_request_duration_seconds": false,
"http_server_response_body_size_bytes": false,
// Exporter metrics - Metrics
"otelcol_exporter_sent_metric_points_total": false,
"otelcol_exporter_send_failed_metric_points_total": false,
// Exporter metrics - Traces
"otelcol_exporter_sent_spans_total": false,
"otelcol_exporter_send_failed_spans_total": false,
// Exporter metrics - Logs
"otelcol_exporter_sent_log_records_total": false,
"otelcol_exporter_send_failed_log_records_total": false,
// Receiver metrics
"otelcol_receiver_accepted_metric_points_total": false,
"otelcol_receiver_refused_metric_points_total": false,
// Receiver metrics - Traces
"otelcol_receiver_accepted_spans_total": false,
"otelcol_receiver_refused_spans_total": false,
// Receiver metrics - Logs
"otelcol_receiver_accepted_log_records_total": false,
"otelcol_receiver_refused_log_records_total": false,
// Other metrics
"promhttp_metric_handler_errors_total": false,
"target_info": false,
},
otelPort: getFreePort(t),
metricsPort: getFreePort(t),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testMetricStability(t, test.configFile, test.expectedMetrics, test.metricsPort, test.otelPort)
})
}
}
func testMetricStability(t *testing.T, configFile string, expectedMetrics map[string]bool, metricsPort, otelPort string) {
t.Setenv("METRICS_PORT", metricsPort)
t.Setenv("OTEL_PORT", otelPort)
collector, err := otelcol.NewCollector(otelcol.CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: func() (otelcol.Factories, error) {
return otelcol.Factories{
Receivers: map[component.Type]receiver.Factory{otlpreceiver.NewFactory().Type(): otlpreceiver.NewFactory()},
Processors: map[component.Type]processor.Factory{batchprocessor.NewFactory().Type(): batchprocessor.NewFactory()},
Exporters: map[component.Type]exporter.Factory{debugexporter.NewFactory().Type(): debugexporter.NewFactory()},
}, nil
},
ConfigProviderSettings: otelcol.ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{filepath.Join("testdata", configFile)},
ProviderFactories: []confmap.ProviderFactory{
fileprovider.NewFactory(),
yamlprovider.NewFactory(),
envprovider.NewFactory(),
},
},
},
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
err := collector.Run(ctx)
if err != nil {
t.Logf("Collector stopped with error: %v", err)
}
}()
require.Eventually(t, func() bool {
resp, err := http.Get(fmt.Sprintf("http://localhost:%s/metrics", metricsPort))
if err != nil {
return false
}
resp.Body.Close()
return resp.StatusCode == http.StatusOK
}, 5*time.Second, 100*time.Millisecond, "collector failed to start")
for i := 0; i < 5; i++ {
sendTestData(t, otelPort)
}
require.Eventually(t, func() bool {
return assertMetrics(t, "localhost:"+metricsPort, expectedMetrics)
}, 10*time.Second, 200*time.Millisecond, "failed to verify metrics")
}
func sendTestData(t *testing.T, otelPort string) {
require.NoError(t, sendTestMetrics(otelPort))
require.NoError(t, sendTestTraces(otelPort))
require.NoError(t, sendTestLogs(otelPort))
}
func sendTestMetrics(otelPort string) error {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
sm := rm.ScopeMetrics().AppendEmpty()
metric := sm.Metrics().AppendEmpty()
metric.SetName("test_metric")
metric.SetDescription("test metric")
metric.SetUnit("1")
dp := metric.SetEmptyGauge().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dp.SetDoubleValue(42.0)
client := &http.Client{}
metricsMarshaler := pmetric.ProtoMarshaler{}
metricsBytes, err := metricsMarshaler.MarshalMetrics(metrics)
if err != nil {
return fmt.Errorf("failed to marshal metrics: %w", err)
}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://localhost:%s/v1/metrics", otelPort), bytes.NewReader(metricsBytes))
if err != nil {
return fmt.Errorf("failed to create metrics request: %w", err)
}
req.Header.Set("Content-Type", "application/x-protobuf")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send metrics: %w", err)
}
resp.Body.Close()
return nil
}
func sendTestTraces(otelPort string) error {
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ss := rs.ScopeSpans().AppendEmpty()
span := ss.Spans().AppendEmpty()
span.SetName("test_span")
now := time.Now()
span.SetStartTimestamp(pcommon.NewTimestampFromTime(now))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(now))
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})
client := &http.Client{}
tracesMarshaler := ptrace.ProtoMarshaler{}
tracesBytes, err := tracesMarshaler.MarshalTraces(traces)
if err != nil {
return fmt.Errorf("failed to marshal traces: %w", err)
}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://localhost:%s/v1/traces", otelPort), bytes.NewReader(tracesBytes))
if err != nil {
return fmt.Errorf("failed to create traces request: %w", err)
}
req.Header.Set("Content-Type", "application/x-protobuf")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send traces: %w", err)
}
resp.Body.Close()
return nil
}
func sendTestLogs(otelPort string) error {
logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
log := sl.LogRecords().AppendEmpty()
log.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
log.SetSeverityText("INFO")
log.SetSeverityNumber(plog.SeverityNumberInfo)
log.Body().SetStr("test log message")
client := &http.Client{}
logsMarshaler := plog.ProtoMarshaler{}
logsBytes, err := logsMarshaler.MarshalLogs(logs)
if err != nil {
return fmt.Errorf("failed to marshal logs: %w", err)
}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://localhost:%s/v1/logs", otelPort), bytes.NewReader(logsBytes))
if err != nil {
return fmt.Errorf("failed to create logs request: %w", err)
}
req.Header.Set("Content-Type", "application/x-protobuf")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send logs: %w", err)
}
resp.Body.Close()
return nil
}
func getFreePort(t *testing.T) string {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("could not get free port: %v", err)
}
defer l.Close()
return strconv.Itoa(l.Addr().(*net.TCPAddr).Port)
}