opentelemetry-collector/exporter/otlphttpexporter/otlp_test.go

477 lines
14 KiB
Go

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlphttpexporter
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/collector/testutil"
)
func TestInvalidConfig(t *testing.T) {
config := &Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: "",
},
}
f := NewFactory()
set := componenttest.NewNopExporterCreateSettings()
_, err := f.CreateTracesExporter(context.Background(), set, config)
require.Error(t, err)
_, err = f.CreateMetricsExporter(context.Background(), set, config)
require.Error(t, err)
_, err = f.CreateLogsExporter(context.Background(), set, config)
require.Error(t, err)
}
func TestTraceNoBackend(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
exp := startTracesExporter(t, "", fmt.Sprintf("http://%s/v1/traces", addr))
td := testdata.GenerateTracesOneSpan()
assert.Error(t, exp.ConsumeTraces(context.Background(), td))
}
func TestTraceInvalidUrl(t *testing.T) {
exp := startTracesExporter(t, "http:/\\//this_is_an/*/invalid_url", "")
td := testdata.GenerateTracesOneSpan()
assert.Error(t, exp.ConsumeTraces(context.Background(), td))
exp = startTracesExporter(t, "", "http:/\\//this_is_an/*/invalid_url")
td = testdata.GenerateTracesOneSpan()
assert.Error(t, exp.ConsumeTraces(context.Background(), td))
}
func TestTraceError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
startTracesReceiver(t, addr, consumertest.NewErr(errors.New("my_error")))
exp := startTracesExporter(t, "", fmt.Sprintf("http://%s/v1/traces", addr))
td := testdata.GenerateTracesOneSpan()
assert.Error(t, exp.ConsumeTraces(context.Background(), td))
}
func TestTraceRoundTrip(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
tests := []struct {
name string
baseURL string
overrideURL string
}{
{
name: "wrongbase",
baseURL: "http://wronghostname",
overrideURL: fmt.Sprintf("http://%s/v1/traces", addr),
},
{
name: "onlybase",
baseURL: fmt.Sprintf("http://%s", addr),
overrideURL: "",
},
{
name: "override",
baseURL: "",
overrideURL: fmt.Sprintf("http://%s/v1/traces", addr),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sink := new(consumertest.TracesSink)
startTracesReceiver(t, addr, sink)
exp := startTracesExporter(t, test.baseURL, test.overrideURL)
td := testdata.GenerateTracesOneSpan()
assert.NoError(t, exp.ConsumeTraces(context.Background(), td))
require.Eventually(t, func() bool {
return sink.SpanCount() > 0
}, 1*time.Second, 10*time.Millisecond)
allTraces := sink.AllTraces()
require.Len(t, allTraces, 1)
assert.EqualValues(t, td, allTraces[0])
})
}
}
func TestCompressionOptions(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
tests := []struct {
name string
baseURL string
compression string
err bool
}{
{
name: "no compression",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "",
},
{
name: "gzip",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "gzip",
},
{
name: "incorrect compression",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "gzip2",
err: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sink := new(consumertest.TracesSink)
startTracesReceiver(t, addr, sink)
factory := NewFactory()
cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig())
cfg.Compression = test.compression
exp, _ := factory.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
err := exp.Start(context.Background(), componenttest.NewNopHost())
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
if test.err {
require.Error(t, err)
return
}
td := testdata.GenerateTracesOneSpan()
assert.NoError(t, exp.ConsumeTraces(context.Background(), td))
require.Eventually(t, func() bool {
return sink.SpanCount() > 0
}, 1*time.Second, 10*time.Millisecond)
allTraces := sink.AllTraces()
require.Len(t, allTraces, 1)
assert.EqualValues(t, td, allTraces[0])
})
}
}
func TestMetricsError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
startMetricsReceiver(t, addr, consumertest.NewErr(errors.New("my_error")))
exp := startMetricsExporter(t, "", fmt.Sprintf("http://%s/v1/metrics", addr))
md := testdata.GenerateMetricsOneMetric()
assert.Error(t, exp.ConsumeMetrics(context.Background(), md))
}
func TestMetricsRoundTrip(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
tests := []struct {
name string
baseURL string
overrideURL string
}{
{
name: "wrongbase",
baseURL: "http://wronghostname",
overrideURL: fmt.Sprintf("http://%s/v1/metrics", addr),
},
{
name: "onlybase",
baseURL: fmt.Sprintf("http://%s", addr),
overrideURL: "",
},
{
name: "override",
baseURL: "",
overrideURL: fmt.Sprintf("http://%s/v1/metrics", addr),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sink := new(consumertest.MetricsSink)
startMetricsReceiver(t, addr, sink)
exp := startMetricsExporter(t, test.baseURL, test.overrideURL)
md := testdata.GenerateMetricsOneMetric()
assert.NoError(t, exp.ConsumeMetrics(context.Background(), md))
require.Eventually(t, func() bool {
return sink.MetricsCount() > 0
}, 1*time.Second, 10*time.Millisecond)
allMetrics := sink.AllMetrics()
require.Len(t, allMetrics, 1)
assert.EqualValues(t, md, allMetrics[0])
})
}
}
func TestLogsError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
startLogsReceiver(t, addr, consumertest.NewErr(errors.New("my_error")))
exp := startLogsExporter(t, "", fmt.Sprintf("http://%s/v1/logs", addr))
md := testdata.GenerateLogsOneLogRecord()
assert.Error(t, exp.ConsumeLogs(context.Background(), md))
}
func TestLogsRoundTrip(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
tests := []struct {
name string
baseURL string
overrideURL string
}{
{
name: "wrongbase",
baseURL: "http://wronghostname",
overrideURL: fmt.Sprintf("http://%s/v1/logs", addr),
},
{
name: "onlybase",
baseURL: fmt.Sprintf("http://%s", addr),
overrideURL: "",
},
{
name: "override",
baseURL: "",
overrideURL: fmt.Sprintf("http://%s/v1/logs", addr),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sink := new(consumertest.LogsSink)
startLogsReceiver(t, addr, sink)
exp := startLogsExporter(t, test.baseURL, test.overrideURL)
md := testdata.GenerateLogsOneLogRecord()
assert.NoError(t, exp.ConsumeLogs(context.Background(), md))
require.Eventually(t, func() bool {
return sink.LogRecordsCount() > 0
}, 1*time.Second, 10*time.Millisecond)
allLogs := sink.AllLogs()
require.Len(t, allLogs, 1)
assert.EqualValues(t, md, allLogs[0])
})
}
}
func startTracesExporter(t *testing.T, baseURL string, overrideURL string) component.TracesExporter {
factory := NewFactory()
cfg := createExporterConfig(baseURL, factory.CreateDefaultConfig())
cfg.TracesEndpoint = overrideURL
exp, err := factory.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
startAndCleanup(t, exp)
return exp
}
func startMetricsExporter(t *testing.T, baseURL string, overrideURL string) component.MetricsExporter {
factory := NewFactory()
cfg := createExporterConfig(baseURL, factory.CreateDefaultConfig())
cfg.MetricsEndpoint = overrideURL
exp, err := factory.CreateMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
startAndCleanup(t, exp)
return exp
}
func startLogsExporter(t *testing.T, baseURL string, overrideURL string) component.LogsExporter {
factory := NewFactory()
cfg := createExporterConfig(baseURL, factory.CreateDefaultConfig())
cfg.LogsEndpoint = overrideURL
exp, err := factory.CreateLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
startAndCleanup(t, exp)
return exp
}
func createExporterConfig(baseURL string, defaultCfg config.Exporter) *Config {
cfg := defaultCfg.(*Config)
cfg.Endpoint = baseURL
cfg.QueueSettings.Enabled = false
cfg.RetrySettings.Enabled = false
return cfg
}
func startTracesReceiver(t *testing.T, addr string, next consumer.Traces) {
factory := otlpreceiver.NewFactory()
cfg := createReceiverConfig(addr, factory.CreateDefaultConfig())
recv, err := factory.CreateTracesReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, next)
require.NoError(t, err)
startAndCleanup(t, recv)
}
func startMetricsReceiver(t *testing.T, addr string, next consumer.Metrics) {
factory := otlpreceiver.NewFactory()
cfg := createReceiverConfig(addr, factory.CreateDefaultConfig())
recv, err := factory.CreateMetricsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, next)
require.NoError(t, err)
startAndCleanup(t, recv)
}
func startLogsReceiver(t *testing.T, addr string, next consumer.Logs) {
factory := otlpreceiver.NewFactory()
cfg := createReceiverConfig(addr, factory.CreateDefaultConfig())
recv, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, next)
require.NoError(t, err)
startAndCleanup(t, recv)
}
func createReceiverConfig(addr string, defaultCfg config.Receiver) *otlpreceiver.Config {
cfg := defaultCfg.(*otlpreceiver.Config)
cfg.HTTP.Endpoint = addr
cfg.GRPC = nil
return cfg
}
func startAndCleanup(t *testing.T, cmp component.Component) {
require.NoError(t, cmp.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, cmp.Shutdown(context.Background()))
})
}
func TestErrorResponses(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
errMsgPrefix := fmt.Sprintf("error exporting items, request to http://%s/v1/traces responded with HTTP Status Code ", addr)
tests := []struct {
name string
responseStatus int
responseBody *status.Status
err error
isPermErr bool
headers map[string]string
}{
{
name: "400",
responseStatus: http.StatusBadRequest,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
isPermErr: true,
},
{
name: "404",
responseStatus: http.StatusNotFound,
err: fmt.Errorf(errMsgPrefix + "404"),
},
{
name: "419",
responseStatus: http.StatusTooManyRequests,
responseBody: status.New(codes.InvalidArgument, "Quota exceeded"),
err: exporterhelper.NewThrottleRetry(
fmt.Errorf(errMsgPrefix+"429, Message=Quota exceeded, Details=[]"),
time.Duration(0)*time.Second),
},
{
name: "503",
responseStatus: http.StatusServiceUnavailable,
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
err: exporterhelper.NewThrottleRetry(
fmt.Errorf(errMsgPrefix+"503, Message=Server overloaded, Details=[]"),
time.Duration(0)*time.Second),
},
{
name: "503-Retry-After",
responseStatus: http.StatusServiceUnavailable,
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
headers: map[string]string{"Retry-After": "30"},
err: exporterhelper.NewThrottleRetry(
fmt.Errorf(errMsgPrefix+"503, Message=Server overloaded, Details=[]"),
time.Duration(30)*time.Second),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("/v1/traces", func(writer http.ResponseWriter, request *http.Request) {
for k, v := range test.headers {
writer.Header().Add(k, v)
}
writer.WriteHeader(test.responseStatus)
if test.responseBody != nil {
msg, err := proto.Marshal(test.responseBody.Proto())
require.NoError(t, err)
_, err = writer.Write(msg)
require.NoError(t, err)
}
})
srv := http.Server{
Addr: addr,
Handler: mux,
}
ln, err := net.Listen("tcp", addr)
require.NoError(t, err)
go func() {
_ = srv.Serve(ln)
}()
cfg := &Config{
ExporterSettings: config.NewExporterSettings(config.NewID(typeStr)),
TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr),
// Create without QueueSettings and RetrySettings so that ConsumeTraces
// returns the errors that we want to check immediately.
}
exp, err := createTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate traces
traces := pdata.NewTraces()
err = exp.ConsumeTraces(context.Background(), traces)
assert.Error(t, err)
if test.isPermErr {
assert.True(t, consumererror.IsPermanent(err))
} else {
assert.EqualValues(t, test.err, err)
}
srv.Close()
})
}
}