367 lines
12 KiB
Go
367 lines
12 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package e2e
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"path/filepath"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/prometheus/common/expfmt"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"go.opentelemetry.io/collector/component"
|
|
"go.opentelemetry.io/collector/confmap"
|
|
"go.opentelemetry.io/collector/confmap/provider/envprovider"
|
|
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
|
|
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
|
|
"go.opentelemetry.io/collector/exporter"
|
|
"go.opentelemetry.io/collector/exporter/debugexporter"
|
|
"go.opentelemetry.io/collector/otelcol"
|
|
"go.opentelemetry.io/collector/pdata/pcommon"
|
|
"go.opentelemetry.io/collector/pdata/plog"
|
|
"go.opentelemetry.io/collector/pdata/pmetric"
|
|
"go.opentelemetry.io/collector/pdata/ptrace"
|
|
"go.opentelemetry.io/collector/processor"
|
|
"go.opentelemetry.io/collector/processor/batchprocessor"
|
|
"go.opentelemetry.io/collector/receiver"
|
|
"go.opentelemetry.io/collector/receiver/otlpreceiver"
|
|
)
|
|
|
|
func assertMetrics(t *testing.T, metricsAddr string, expectedMetrics map[string]bool) bool {
|
|
client := &http.Client{}
|
|
resp, err := client.Get(fmt.Sprintf("http://%s/metrics", metricsAddr))
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return false
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
reader := bufio.NewReader(resp.Body)
|
|
var parser expfmt.TextParser
|
|
parsed, err := parser.TextToMetricFamilies(reader)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
for metricName, metricFamily := range parsed {
|
|
if _, ok := expectedMetrics[metricName]; ok {
|
|
expectedMetrics[metricName] = true
|
|
assert.GreaterOrEqual(t, len(metricFamily.Metric), 1,
|
|
"metric %s should have at least one data point", metricName)
|
|
}
|
|
}
|
|
|
|
for metricName, found := range expectedMetrics {
|
|
if !found {
|
|
t.Logf("expected metric %s was not found", metricName)
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func TestMetricStability(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
configFile string
|
|
expectedMetrics map[string]bool
|
|
otelPort string
|
|
metricsPort string
|
|
}{
|
|
{
|
|
name: "No metric readers (default)",
|
|
configFile: "metric_stability_test_no_readers.yaml",
|
|
expectedMetrics: map[string]bool{
|
|
// Process metrics
|
|
"otelcol_process_uptime": false,
|
|
"otelcol_process_cpu_seconds": false,
|
|
"otelcol_process_memory_rss": false,
|
|
"otelcol_process_runtime_heap_alloc_bytes": false,
|
|
"otelcol_process_runtime_total_alloc_bytes": false,
|
|
"otelcol_process_runtime_total_sys_memory_bytes": false,
|
|
|
|
// Batch processor metrics
|
|
"otelcol_processor_batch_batch_send_size": false,
|
|
"otelcol_processor_batch_batch_send_size_bytes": false,
|
|
"otelcol_processor_batch_metadata_cardinality": false,
|
|
"otelcol_processor_batch_timeout_trigger_send": false,
|
|
|
|
// HTTP server metrics
|
|
"http_server_request_body_size": false,
|
|
"http_server_request_duration": false,
|
|
"http_server_response_body_size": false,
|
|
|
|
// Exporter metrics
|
|
"otelcol_exporter_sent_metric_points": false,
|
|
"otelcol_exporter_send_failed_metric_points": false,
|
|
"otelcol_exporter_sent_spans": false,
|
|
"otelcol_exporter_send_failed_spans": false,
|
|
"otelcol_exporter_sent_log_records": false,
|
|
"otelcol_exporter_send_failed_log_records": false,
|
|
|
|
// Receiver metrics
|
|
"otelcol_receiver_accepted_metric_points": false,
|
|
"otelcol_receiver_refused_metric_points": false,
|
|
"otelcol_receiver_accepted_spans": false,
|
|
"otelcol_receiver_refused_spans": false,
|
|
"otelcol_receiver_accepted_log_records": false,
|
|
"otelcol_receiver_refused_log_records": false,
|
|
|
|
// Other metrics
|
|
"promhttp_metric_handler_errors_total": false,
|
|
"target_info": false,
|
|
},
|
|
otelPort: getFreePort(t),
|
|
metricsPort: "8888", // default metrics port
|
|
},
|
|
{
|
|
name: "Metric readers",
|
|
configFile: "metric_stability_test_readers.yaml",
|
|
expectedMetrics: map[string]bool{
|
|
// Process metrics
|
|
"otelcol_process_uptime_seconds_total": false,
|
|
"otelcol_process_cpu_seconds_total": false,
|
|
"otelcol_process_memory_rss_bytes": false,
|
|
"otelcol_process_runtime_heap_alloc_bytes": false,
|
|
"otelcol_process_runtime_total_alloc_bytes_total": false,
|
|
"otelcol_process_runtime_total_sys_memory_bytes": false,
|
|
|
|
// Batch processor metrics
|
|
"otelcol_processor_batch_batch_send_size": false,
|
|
"otelcol_processor_batch_batch_send_size_bytes": false,
|
|
"otelcol_processor_batch_metadata_cardinality": false,
|
|
"otelcol_processor_batch_timeout_trigger_send_total": false,
|
|
|
|
// HTTP server metrics
|
|
"http_server_request_body_size_bytes": false,
|
|
"http_server_request_duration_seconds": false,
|
|
"http_server_response_body_size_bytes": false,
|
|
|
|
// Exporter metrics - Metrics
|
|
"otelcol_exporter_sent_metric_points_total": false,
|
|
"otelcol_exporter_send_failed_metric_points_total": false,
|
|
|
|
// Exporter metrics - Traces
|
|
"otelcol_exporter_sent_spans_total": false,
|
|
"otelcol_exporter_send_failed_spans_total": false,
|
|
|
|
// Exporter metrics - Logs
|
|
"otelcol_exporter_sent_log_records_total": false,
|
|
"otelcol_exporter_send_failed_log_records_total": false,
|
|
|
|
// Receiver metrics
|
|
"otelcol_receiver_accepted_metric_points_total": false,
|
|
"otelcol_receiver_refused_metric_points_total": false,
|
|
|
|
// Receiver metrics - Traces
|
|
"otelcol_receiver_accepted_spans_total": false,
|
|
"otelcol_receiver_refused_spans_total": false,
|
|
|
|
// Receiver metrics - Logs
|
|
"otelcol_receiver_accepted_log_records_total": false,
|
|
"otelcol_receiver_refused_log_records_total": false,
|
|
|
|
// Other metrics
|
|
"promhttp_metric_handler_errors_total": false,
|
|
"target_info": false,
|
|
},
|
|
otelPort: getFreePort(t),
|
|
metricsPort: getFreePort(t),
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
testMetricStability(t, test.configFile, test.expectedMetrics, test.metricsPort, test.otelPort)
|
|
})
|
|
}
|
|
}
|
|
|
|
func testMetricStability(t *testing.T, configFile string, expectedMetrics map[string]bool, metricsPort, otelPort string) {
|
|
t.Setenv("METRICS_PORT", metricsPort)
|
|
t.Setenv("OTEL_PORT", otelPort)
|
|
|
|
collector, err := otelcol.NewCollector(otelcol.CollectorSettings{
|
|
BuildInfo: component.NewDefaultBuildInfo(),
|
|
Factories: func() (otelcol.Factories, error) {
|
|
return otelcol.Factories{
|
|
Receivers: map[component.Type]receiver.Factory{otlpreceiver.NewFactory().Type(): otlpreceiver.NewFactory()},
|
|
Processors: map[component.Type]processor.Factory{batchprocessor.NewFactory().Type(): batchprocessor.NewFactory()},
|
|
Exporters: map[component.Type]exporter.Factory{debugexporter.NewFactory().Type(): debugexporter.NewFactory()},
|
|
}, nil
|
|
},
|
|
ConfigProviderSettings: otelcol.ConfigProviderSettings{
|
|
ResolverSettings: confmap.ResolverSettings{
|
|
URIs: []string{filepath.Join("testdata", configFile)},
|
|
ProviderFactories: []confmap.ProviderFactory{
|
|
fileprovider.NewFactory(),
|
|
yamlprovider.NewFactory(),
|
|
envprovider.NewFactory(),
|
|
},
|
|
},
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
go func() {
|
|
err := collector.Run(ctx)
|
|
if err != nil {
|
|
t.Logf("Collector stopped with error: %v", err)
|
|
}
|
|
}()
|
|
|
|
require.Eventually(t, func() bool {
|
|
resp, err := http.Get(fmt.Sprintf("http://localhost:%s/metrics", metricsPort))
|
|
if err != nil {
|
|
return false
|
|
}
|
|
resp.Body.Close()
|
|
return resp.StatusCode == http.StatusOK
|
|
}, 5*time.Second, 100*time.Millisecond, "collector failed to start")
|
|
|
|
for i := 0; i < 5; i++ {
|
|
sendTestData(t, otelPort)
|
|
}
|
|
|
|
require.Eventually(t, func() bool {
|
|
return assertMetrics(t, "localhost:"+metricsPort, expectedMetrics)
|
|
}, 10*time.Second, 200*time.Millisecond, "failed to verify metrics")
|
|
}
|
|
|
|
func sendTestData(t *testing.T, otelPort string) {
|
|
require.NoError(t, sendTestMetrics(otelPort))
|
|
require.NoError(t, sendTestTraces(otelPort))
|
|
require.NoError(t, sendTestLogs(otelPort))
|
|
}
|
|
|
|
func sendTestMetrics(otelPort string) error {
|
|
metrics := pmetric.NewMetrics()
|
|
rm := metrics.ResourceMetrics().AppendEmpty()
|
|
sm := rm.ScopeMetrics().AppendEmpty()
|
|
metric := sm.Metrics().AppendEmpty()
|
|
metric.SetName("test_metric")
|
|
metric.SetDescription("test metric")
|
|
metric.SetUnit("1")
|
|
dp := metric.SetEmptyGauge().DataPoints().AppendEmpty()
|
|
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
|
|
dp.SetDoubleValue(42.0)
|
|
|
|
client := &http.Client{}
|
|
|
|
metricsMarshaler := pmetric.ProtoMarshaler{}
|
|
metricsBytes, err := metricsMarshaler.MarshalMetrics(metrics)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal metrics: %w", err)
|
|
}
|
|
|
|
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://localhost:%s/v1/metrics", otelPort), bytes.NewReader(metricsBytes))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create metrics request: %w", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/x-protobuf")
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send metrics: %w", err)
|
|
}
|
|
resp.Body.Close()
|
|
|
|
return nil
|
|
}
|
|
|
|
func sendTestTraces(otelPort string) error {
|
|
traces := ptrace.NewTraces()
|
|
rs := traces.ResourceSpans().AppendEmpty()
|
|
ss := rs.ScopeSpans().AppendEmpty()
|
|
span := ss.Spans().AppendEmpty()
|
|
span.SetName("test_span")
|
|
now := time.Now()
|
|
span.SetStartTimestamp(pcommon.NewTimestampFromTime(now))
|
|
span.SetEndTimestamp(pcommon.NewTimestampFromTime(now))
|
|
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
|
|
span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})
|
|
|
|
client := &http.Client{}
|
|
|
|
tracesMarshaler := ptrace.ProtoMarshaler{}
|
|
tracesBytes, err := tracesMarshaler.MarshalTraces(traces)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal traces: %w", err)
|
|
}
|
|
|
|
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://localhost:%s/v1/traces", otelPort), bytes.NewReader(tracesBytes))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create traces request: %w", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/x-protobuf")
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send traces: %w", err)
|
|
}
|
|
resp.Body.Close()
|
|
|
|
return nil
|
|
}
|
|
|
|
func sendTestLogs(otelPort string) error {
|
|
logs := plog.NewLogs()
|
|
rl := logs.ResourceLogs().AppendEmpty()
|
|
sl := rl.ScopeLogs().AppendEmpty()
|
|
log := sl.LogRecords().AppendEmpty()
|
|
log.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
|
|
log.SetSeverityText("INFO")
|
|
log.SetSeverityNumber(plog.SeverityNumberInfo)
|
|
log.Body().SetStr("test log message")
|
|
|
|
client := &http.Client{}
|
|
|
|
logsMarshaler := plog.ProtoMarshaler{}
|
|
logsBytes, err := logsMarshaler.MarshalLogs(logs)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal logs: %w", err)
|
|
}
|
|
|
|
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://localhost:%s/v1/logs", otelPort), bytes.NewReader(logsBytes))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create logs request: %w", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/x-protobuf")
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send logs: %w", err)
|
|
}
|
|
resp.Body.Close()
|
|
|
|
return nil
|
|
}
|
|
|
|
func getFreePort(t *testing.T) string {
|
|
t.Helper()
|
|
l, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
t.Fatalf("could not get free port: %v", err)
|
|
}
|
|
defer l.Close()
|
|
return strconv.Itoa(l.Addr().(*net.TCPAddr).Port)
|
|
}
|