opentelemetry-collector/exporter/otlphttpexporter/otlp_test.go

1177 lines
35 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otlphttpexporter
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/otlphttpexporter/internal/metadata"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/pprofile/pprofileotlp"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
)
const (
tracesTelemetryType = "traces"
metricsTelemetryType = "metrics"
logsTelemetryType = "logs"
profilesTelemetryType = "profiles"
)
type responseSerializer interface {
MarshalJSON() ([]byte, error)
MarshalProto() ([]byte, error)
}
type responseSerializerProvider = func() responseSerializer
func provideTracesResponseSerializer() responseSerializer {
response := ptraceotlp.NewExportResponse()
partial := response.PartialSuccess()
partial.SetErrorMessage("hello")
partial.SetRejectedSpans(1)
return response
}
func provideMetricsResponseSerializer() responseSerializer {
response := pmetricotlp.NewExportResponse()
partial := response.PartialSuccess()
partial.SetErrorMessage("hello")
partial.SetRejectedDataPoints(1)
return response
}
func provideLogsResponseSerializer() responseSerializer {
response := plogotlp.NewExportResponse()
partial := response.PartialSuccess()
partial.SetErrorMessage("hello")
partial.SetRejectedLogRecords(1)
return response
}
func provideProfilesResponseSerializer() responseSerializer {
response := pprofileotlp.NewExportResponse()
partial := response.PartialSuccess()
partial.SetErrorMessage("hello")
partial.SetRejectedProfiles(1)
return response
}
func TestErrorResponses(t *testing.T) {
errMsgPrefix := func(srv *httptest.Server) string {
return fmt.Sprintf("error exporting items, request to %s/v1/traces responded with HTTP Status Code ", srv.URL)
}
tests := []struct {
name string
responseStatus int
responseBody *status.Status
checkErr func(t *testing.T, err error, srv *httptest.Server)
headers map[string]string
}{
{
name: "400",
responseStatus: http.StatusBadRequest,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "402",
responseStatus: http.StatusPaymentRequired,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "404",
responseStatus: http.StatusNotFound,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "405",
responseStatus: http.StatusMethodNotAllowed,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "413",
responseStatus: http.StatusRequestEntityTooLarge,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "414",
responseStatus: http.StatusRequestURITooLong,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "431",
responseStatus: http.StatusRequestHeaderFieldsTooLarge,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "429",
responseStatus: http.StatusTooManyRequests,
responseBody: status.New(codes.ResourceExhausted, "Quota exceeded"),
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
require.EqualError(t, err, status.New(codes.ResourceExhausted, errMsgPrefix(srv)+"429, Message=Quota exceeded, Details=[]").String())
},
},
{
name: "429-Retry-After",
responseStatus: http.StatusTooManyRequests,
responseBody: status.New(codes.InvalidArgument, "Quota exceeded"),
headers: map[string]string{"Retry-After": "Mon, 09 Feb 2025 15:04:05 GMT"},
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
// Cannot test for the delay part since it depends on now. Check first part (which has a negative duration) and last part:
require.ErrorContains(t, err, "Throttle (-")
require.ErrorContains(t, err, "), error: "+status.New(codes.ResourceExhausted, errMsgPrefix(srv)+"429, Message=Quota exceeded, Details=[]").String())
},
},
{
name: "429-Retry-After-Malformed",
responseStatus: http.StatusTooManyRequests,
responseBody: status.New(codes.InvalidArgument, "Quota exceeded"),
headers: map[string]string{"Retry-After": "Malformed"},
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
// Cannot test for the delay part since it depends on now. Check first part (which has a negative duration) and last part:
require.EqualError(t, err, status.New(codes.ResourceExhausted, errMsgPrefix(srv)+"429, Message=Quota exceeded, Details=[]").String())
},
},
{
name: "500",
responseStatus: http.StatusInternalServerError,
responseBody: status.New(codes.InvalidArgument, "Internal server error"),
checkErr: func(t *testing.T, err error, _ *httptest.Server) {
assert.True(t, consumererror.IsPermanent(err))
},
},
{
name: "502",
responseStatus: http.StatusBadGateway,
responseBody: status.New(codes.InvalidArgument, "Bad gateway"),
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
require.EqualError(t, err, status.New(codes.Unavailable, errMsgPrefix(srv)+"502, Message=Bad gateway, Details=[]").String())
},
},
{
name: "503",
responseStatus: http.StatusServiceUnavailable,
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
require.EqualError(t, err, status.New(codes.Unavailable, errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]").String())
},
},
{
name: "503-Retry-After",
responseStatus: http.StatusServiceUnavailable,
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
headers: map[string]string{"Retry-After": "30"},
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
require.EqualError(t, err, exporterhelper.NewThrottleRetry(
status.New(codes.Unavailable, errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]").Err(),
time.Duration(30)*time.Second).Error())
},
},
{
name: "504",
responseStatus: http.StatusGatewayTimeout,
responseBody: status.New(codes.InvalidArgument, "Gateway timeout"),
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
require.EqualError(t, err, status.New(codes.Unavailable, errMsgPrefix(srv)+"504, Message=Gateway timeout, Details=[]").String())
},
},
{
name: "Bad response payload",
responseStatus: http.StatusServiceUnavailable,
responseBody: status.New(codes.InvalidArgument, strings.Repeat("a", maxHTTPResponseReadBytes+1)),
checkErr: func(t *testing.T, err error, srv *httptest.Server) {
require.EqualError(t, err, status.New(codes.Unavailable, errMsgPrefix(srv)+"503").String())
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
srv := createBackend("/v1/traces", func(writer http.ResponseWriter, _ *http.Request) {
for k, v := range test.headers {
writer.Header().Add(k, v)
}
writer.WriteHeader(test.responseStatus)
if test.responseBody != nil {
msg, err := proto.Marshal(test.responseBody.Proto())
assert.NoError(t, err)
_, err = writer.Write(msg)
assert.NoError(t, err)
}
})
defer srv.Close()
cfg := &Config{
Encoding: EncodingProto,
TracesEndpoint: srv.URL + "/v1/traces",
// Create without QueueConfig and RetryConfig so that ConsumeTraces
// returns the errors that we want to check immediately.
}
exp, err := createTraces(context.Background(), exportertest.NewNopSettings(metadata.Type), cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate traces
traces := ptrace.NewTraces()
err = exp.ConsumeTraces(context.Background(), traces)
require.Error(t, err)
test.checkErr(t, err, srv)
})
}
}
func TestErrorResponseInvalidResponseBody(t *testing.T) {
resp := &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(badReader{}),
ContentLength: 100,
}
assert.Nil(t, readResponseStatus(resp))
}
func TestUserAgent(t *testing.T) {
set := exportertest.NewNopSettings(metadata.Type)
set.BuildInfo.Description = "Collector"
set.BuildInfo.Version = "1.2.3test"
tests := []struct {
name string
headers map[string]configopaque.String
expectedUA string
}{
{
name: "default_user_agent",
expectedUA: "Collector/1.2.3test",
},
{
name: "custom_user_agent",
headers: map[string]configopaque.String{"User-Agent": "My Custom Agent"},
expectedUA: "My Custom Agent",
},
{
name: "custom_user_agent_lowercase",
headers: map[string]configopaque.String{"user-agent": "My Custom Agent"},
expectedUA: "My Custom Agent",
},
}
t.Run("traces", func(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) {
assert.Contains(t, request.Header.Get("user-agent"), tt.expectedUA)
writer.WriteHeader(http.StatusOK)
})
defer srv.Close()
cfg := &Config{
Encoding: EncodingProto,
TracesEndpoint: srv.URL + "/v1/traces",
ClientConfig: confighttp.ClientConfig{
Headers: tt.headers,
},
}
exp, err := createTraces(context.Background(), set, cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate data
traces := ptrace.NewTraces()
err = exp.ConsumeTraces(context.Background(), traces)
require.NoError(t, err)
})
}
})
t.Run("metrics", func(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) {
assert.Contains(t, request.Header.Get("user-agent"), tt.expectedUA)
writer.WriteHeader(http.StatusOK)
})
defer srv.Close()
cfg := &Config{
Encoding: EncodingProto,
MetricsEndpoint: srv.URL + "/v1/metrics",
ClientConfig: confighttp.ClientConfig{
Headers: tt.headers,
},
}
exp, err := createMetrics(context.Background(), set, cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate data
metrics := pmetric.NewMetrics()
err = exp.ConsumeMetrics(context.Background(), metrics)
require.NoError(t, err)
})
}
})
t.Run("logs", func(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) {
assert.Contains(t, request.Header.Get("user-agent"), tt.expectedUA)
writer.WriteHeader(http.StatusOK)
})
defer srv.Close()
cfg := &Config{
Encoding: EncodingProto,
LogsEndpoint: srv.URL + "/v1/logs",
ClientConfig: confighttp.ClientConfig{
Headers: tt.headers,
},
}
exp, err := createLogs(context.Background(), set, cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate data
logs := plog.NewLogs()
err = exp.ConsumeLogs(context.Background(), logs)
require.NoError(t, err)
srv.Close()
})
}
})
t.Run("profiles", func(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
srv := createBackend("/v1development/profiles", func(writer http.ResponseWriter, request *http.Request) {
assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA)
writer.WriteHeader(http.StatusOK)
})
defer srv.Close()
cfg := &Config{
Encoding: EncodingProto,
ClientConfig: confighttp.ClientConfig{
Endpoint: srv.URL,
Headers: test.headers,
},
}
exp, err := createProfiles(context.Background(), set, cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate data
profiles := pprofile.NewProfiles()
err = exp.ConsumeProfiles(context.Background(), profiles)
require.NoError(t, err)
})
}
})
}
func TestPartialSuccessInvalidBody(t *testing.T) {
cfg := createDefaultConfig()
set := exportertest.NewNopSettings(metadata.Type)
exp, err := newExporter(cfg, set)
require.NoError(t, err)
invalidBodyCases := []struct {
telemetryType string
handler partialSuccessHandler
}{
{
telemetryType: "traces",
handler: exp.tracesPartialSuccessHandler,
},
{
telemetryType: "metrics",
handler: exp.metricsPartialSuccessHandler,
},
{
telemetryType: "logs",
handler: exp.logsPartialSuccessHandler,
},
{
telemetryType: "profiles",
handler: exp.profilesPartialSuccessHandler,
},
}
for _, tt := range invalidBodyCases {
t.Run("Invalid response body_"+tt.telemetryType, func(t *testing.T) {
err := tt.handler([]byte{1}, "application/x-protobuf")
assert.ErrorContains(t, err, "error parsing protobuf response:")
})
}
}
func TestPartialSuccessUnsupportedContentType(t *testing.T) {
cfg := createDefaultConfig()
set := exportertest.NewNopSettings(metadata.Type)
exp, err := newExporter(cfg, set)
require.NoError(t, err)
unsupportedContentTypeCases := []struct {
contentType string
}{
{
contentType: "text/plain",
},
{
contentType: "application/octet-stream",
},
}
for _, telemetryType := range []string{"logs", "metrics", "traces", "profiles"} {
for _, tt := range unsupportedContentTypeCases {
t.Run("Unsupported content type "+tt.contentType+" "+telemetryType, func(t *testing.T) {
var handler func(b []byte, contentType string) error
switch telemetryType {
case "logs":
handler = exp.logsPartialSuccessHandler
case "metrics":
handler = exp.metricsPartialSuccessHandler
case "traces":
handler = exp.tracesPartialSuccessHandler
case "profiles":
handler = exp.profilesPartialSuccessHandler
default:
panic(telemetryType)
}
exportResponse := ptraceotlp.NewExportResponse()
exportResponse.PartialSuccess().SetErrorMessage("foo")
exportResponse.PartialSuccess().SetRejectedSpans(42)
b, err := exportResponse.MarshalProto()
require.NoError(t, err)
err = handler(b, tt.contentType)
assert.NoError(t, err)
})
}
}
}
func TestPartialSuccess_logs(t *testing.T) {
srv := createBackend("/v1/logs", func(writer http.ResponseWriter, _ *http.Request) {
response := plogotlp.NewExportResponse()
partial := response.PartialSuccess()
partial.SetErrorMessage("hello")
partial.SetRejectedLogRecords(1)
b, err := response.MarshalProto()
assert.NoError(t, err)
writer.Header().Set("Content-Type", "application/x-protobuf")
_, err = writer.Write(b)
assert.NoError(t, err)
})
defer srv.Close()
cfg := &Config{
Encoding: EncodingProto,
LogsEndpoint: srv.URL + "/v1/logs",
ClientConfig: confighttp.ClientConfig{},
}
set := exportertest.NewNopSettings(metadata.Type)
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)
exp, err := createLogs(context.Background(), set, cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate data
logs := plog.NewLogs()
err = exp.ConsumeLogs(context.Background(), logs)
require.NoError(t, err)
require.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1)
require.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "Partial success")
}
func TestPartialResponse_missingHeaderButHasBody(t *testing.T) {
cfg := createDefaultConfig()
set := exportertest.NewNopSettings(metadata.Type)
exp, err := newExporter(cfg, set)
require.NoError(t, err)
contentTypes := []struct {
contentType string
}{
{contentType: protobufContentType},
{contentType: jsonContentType},
}
telemetryTypes := []struct {
telemetryType string
handler partialSuccessHandler
serializer responseSerializerProvider
}{
{
telemetryType: tracesTelemetryType,
handler: exp.tracesPartialSuccessHandler,
serializer: provideTracesResponseSerializer,
},
{
telemetryType: metricsTelemetryType,
handler: exp.metricsPartialSuccessHandler,
serializer: provideMetricsResponseSerializer,
},
{
telemetryType: logsTelemetryType,
handler: exp.logsPartialSuccessHandler,
serializer: provideLogsResponseSerializer,
},
{
telemetryType: profilesTelemetryType,
handler: exp.profilesPartialSuccessHandler,
serializer: provideProfilesResponseSerializer,
},
}
for _, ct := range contentTypes {
for _, tt := range telemetryTypes {
t.Run(tt.telemetryType+" "+ct.contentType, func(t *testing.T) {
serializer := tt.serializer()
var data []byte
var err error
switch ct.contentType {
case jsonContentType:
data, err = serializer.MarshalJSON()
case protobufContentType:
data, err = serializer.MarshalProto()
default:
require.Failf(t, "unsupported content type: %s", ct.contentType)
}
require.NoError(t, err)
resp := &http.Response{
// `-1` indicates a missing Content-Length header in the Go http standard library
ContentLength: -1,
Body: io.NopCloser(bytes.NewReader(data)),
Header: map[string][]string{
"Content-Type": {ct.contentType},
},
}
err = handlePartialSuccessResponse(resp, tt.handler)
assert.NoError(t, err)
})
}
}
}
func TestPartialResponse_missingHeaderAndBody(t *testing.T) {
cfg := createDefaultConfig()
set := exportertest.NewNopSettings(metadata.Type)
exp, err := newExporter(cfg, set)
require.NoError(t, err)
contentTypes := []struct {
contentType string
}{
{contentType: protobufContentType},
{contentType: jsonContentType},
}
telemetryTypes := []struct {
telemetryType string
handler partialSuccessHandler
}{
{
telemetryType: tracesTelemetryType,
handler: exp.tracesPartialSuccessHandler,
},
{
telemetryType: metricsTelemetryType,
handler: exp.metricsPartialSuccessHandler,
},
{
telemetryType: logsTelemetryType,
handler: exp.logsPartialSuccessHandler,
},
{
telemetryType: profilesTelemetryType,
handler: exp.profilesPartialSuccessHandler,
},
}
for _, ct := range contentTypes {
for _, tt := range telemetryTypes {
t.Run(tt.telemetryType+" "+ct.contentType, func(t *testing.T) {
resp := &http.Response{
// `-1` indicates a missing Content-Length header in the Go http standard library
ContentLength: -1,
Body: io.NopCloser(bytes.NewReader([]byte{})),
Header: map[string][]string{
"Content-Type": {ct.contentType},
},
}
err = handlePartialSuccessResponse(resp, tt.handler)
assert.NoError(t, err)
})
}
}
}
func TestPartialResponse_nonErrUnexpectedEOFError(t *testing.T) {
cfg := createDefaultConfig()
set := exportertest.NewNopSettings(metadata.Type)
exp, err := newExporter(cfg, set)
require.NoError(t, err)
resp := &http.Response{
// `-1` indicates a missing Content-Length header in the Go http standard library
ContentLength: -1,
Body: io.NopCloser(badReader{}),
}
err = handlePartialSuccessResponse(resp, exp.tracesPartialSuccessHandler)
assert.Error(t, err)
}
func TestPartialSuccess_shortContentLengthHeader(t *testing.T) {
cfg := createDefaultConfig()
set := exportertest.NewNopSettings(metadata.Type)
exp, err := newExporter(cfg, set)
require.NoError(t, err)
contentTypes := []struct {
contentType string
}{
{contentType: protobufContentType},
{contentType: jsonContentType},
}
telemetryTypes := []struct {
telemetryType string
handler partialSuccessHandler
serializer responseSerializerProvider
}{
{
telemetryType: tracesTelemetryType,
handler: exp.tracesPartialSuccessHandler,
serializer: provideTracesResponseSerializer,
},
{
telemetryType: metricsTelemetryType,
handler: exp.metricsPartialSuccessHandler,
serializer: provideMetricsResponseSerializer,
},
{
telemetryType: logsTelemetryType,
handler: exp.logsPartialSuccessHandler,
serializer: provideLogsResponseSerializer,
},
{
telemetryType: profilesTelemetryType,
handler: exp.profilesPartialSuccessHandler,
serializer: provideProfilesResponseSerializer,
},
}
for _, ct := range contentTypes {
for _, tt := range telemetryTypes {
t.Run(tt.telemetryType+" "+ct.contentType, func(t *testing.T) {
serializer := tt.serializer()
var data []byte
var err error
switch ct.contentType {
case jsonContentType:
data, err = serializer.MarshalJSON()
case protobufContentType:
data, err = serializer.MarshalProto()
default:
require.Failf(t, "unsupported content type: %s", ct.contentType)
}
require.NoError(t, err)
resp := &http.Response{
ContentLength: 3,
Body: io.NopCloser(bytes.NewReader(data)),
Header: map[string][]string{
"Content-Type": {ct.contentType},
},
}
// For short content-length, a real error happens.
err = handlePartialSuccessResponse(resp, tt.handler)
assert.Error(t, err)
})
}
}
}
func TestPartialSuccess_longContentLengthHeader(t *testing.T) {
contentTypes := []struct {
contentType string
}{
{contentType: protobufContentType},
{contentType: jsonContentType},
}
telemetryTypes := []struct {
telemetryType string
serializer responseSerializerProvider
}{
{
telemetryType: tracesTelemetryType,
serializer: provideTracesResponseSerializer,
},
{
telemetryType: metricsTelemetryType,
serializer: provideMetricsResponseSerializer,
},
{
telemetryType: logsTelemetryType,
serializer: provideLogsResponseSerializer,
},
{
telemetryType: profilesTelemetryType,
serializer: provideProfilesResponseSerializer,
},
}
for _, ct := range contentTypes {
for _, tt := range telemetryTypes {
t.Run(tt.telemetryType+" "+ct.contentType, func(t *testing.T) {
cfg := createDefaultConfig()
set := exportertest.NewNopSettings(metadata.Type)
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)
exp, err := newExporter(cfg, set)
require.NoError(t, err)
serializer := tt.serializer()
var handler partialSuccessHandler
switch tt.telemetryType {
case tracesTelemetryType:
handler = exp.tracesPartialSuccessHandler
case metricsTelemetryType:
handler = exp.metricsPartialSuccessHandler
case logsTelemetryType:
handler = exp.logsPartialSuccessHandler
case profilesTelemetryType:
handler = exp.profilesPartialSuccessHandler
default:
require.Failf(t, "unsupported telemetry type: %s", ct.contentType)
}
var data []byte
switch ct.contentType {
case jsonContentType:
data, err = serializer.MarshalJSON()
case protobufContentType:
data, err = serializer.MarshalProto()
default:
require.Failf(t, "unsupported content type: %s", ct.contentType)
}
require.NoError(t, err)
resp := &http.Response{
ContentLength: 4096,
Body: io.NopCloser(bytes.NewReader(data)),
Header: map[string][]string{
"Content-Type": {ct.contentType},
},
}
// No real error happens for long content length, so the partial
// success is handled as success with a warning.
err = handlePartialSuccessResponse(resp, handler)
require.NoError(t, err)
assert.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1)
assert.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "Partial success")
})
}
}
}
func TestPartialSuccessInvalidResponseBody(t *testing.T) {
cfg := createDefaultConfig()
set := exportertest.NewNopSettings(metadata.Type)
exp, err := newExporter(cfg, set)
require.NoError(t, err)
resp := &http.Response{
Body: io.NopCloser(badReader{}),
ContentLength: 100,
Header: map[string][]string{
"Content-Type": {protobufContentType},
},
}
err = handlePartialSuccessResponse(resp, exp.tracesPartialSuccessHandler)
assert.Error(t, err)
}
func TestPartialSuccess_traces(t *testing.T) {
srv := createBackend("/v1/traces", func(writer http.ResponseWriter, _ *http.Request) {
response := ptraceotlp.NewExportResponse()
partial := response.PartialSuccess()
partial.SetErrorMessage("hello")
partial.SetRejectedSpans(1)
bytes, err := response.MarshalProto()
assert.NoError(t, err)
writer.Header().Set("Content-Type", "application/x-protobuf")
_, err = writer.Write(bytes)
assert.NoError(t, err)
})
defer srv.Close()
cfg := &Config{
Encoding: EncodingProto,
TracesEndpoint: srv.URL + "/v1/traces",
ClientConfig: confighttp.ClientConfig{},
}
set := exportertest.NewNopSettings(metadata.Type)
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)
exp, err := createTraces(context.Background(), set, cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate data
traces := ptrace.NewTraces()
err = exp.ConsumeTraces(context.Background(), traces)
require.NoError(t, err)
require.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1)
require.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "Partial success")
}
func TestPartialSuccess_metrics(t *testing.T) {
srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, _ *http.Request) {
response := pmetricotlp.NewExportResponse()
partial := response.PartialSuccess()
partial.SetErrorMessage("hello")
partial.SetRejectedDataPoints(1)
bytes, err := response.MarshalProto()
assert.NoError(t, err)
writer.Header().Set("Content-Type", "application/x-protobuf")
_, err = writer.Write(bytes)
assert.NoError(t, err)
})
defer srv.Close()
cfg := &Config{
Encoding: EncodingProto,
MetricsEndpoint: srv.URL + "/v1/metrics",
ClientConfig: confighttp.ClientConfig{},
}
set := exportertest.NewNopSettings(metadata.Type)
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)
exp, err := createMetrics(context.Background(), set, cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate data
metrics := pmetric.NewMetrics()
err = exp.ConsumeMetrics(context.Background(), metrics)
require.NoError(t, err)
require.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1)
require.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "Partial success")
}
func TestPartialSuccess_profiles(t *testing.T) {
srv := createBackend("/v1development/profiles", func(writer http.ResponseWriter, _ *http.Request) {
response := pprofileotlp.NewExportResponse()
partial := response.PartialSuccess()
partial.SetErrorMessage("hello")
partial.SetRejectedProfiles(1)
bytes, err := response.MarshalProto()
assert.NoError(t, err)
writer.Header().Set("Content-Type", "application/x-protobuf")
_, err = writer.Write(bytes)
assert.NoError(t, err)
})
defer srv.Close()
cfg := &Config{
Encoding: EncodingProto,
ClientConfig: confighttp.ClientConfig{
Endpoint: srv.URL,
},
}
set := exportertest.NewNopSettings(metadata.Type)
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)
exp, err := createProfiles(context.Background(), set, cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate data
profiles := pprofile.NewProfiles()
err = exp.ConsumeProfiles(context.Background(), profiles)
require.NoError(t, err)
require.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1)
require.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "Partial success")
}
func TestEncoding(t *testing.T) {
set := exportertest.NewNopSettings(metadata.Type)
set.BuildInfo.Description = "Collector"
set.BuildInfo.Version = "1.2.3test"
tests := []struct {
name string
encoding EncodingType
expectedEncoding EncodingType
}{
{
name: "proto_encoding",
encoding: EncodingProto,
expectedEncoding: "application/x-protobuf",
},
{
name: "json_encoding",
encoding: EncodingJSON,
expectedEncoding: "application/json",
},
}
t.Run("traces", func(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) {
assert.Contains(t, request.Header.Get("content-type"), tt.expectedEncoding)
writer.WriteHeader(http.StatusOK)
})
defer srv.Close()
cfg := &Config{
TracesEndpoint: srv.URL + "/v1/traces",
Encoding: tt.encoding,
}
exp, err := createTraces(context.Background(), set, cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate data
traces := ptrace.NewTraces()
err = exp.ConsumeTraces(context.Background(), traces)
require.NoError(t, err)
})
}
})
t.Run("metrics", func(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) {
assert.Contains(t, request.Header.Get("content-type"), tt.expectedEncoding)
writer.WriteHeader(http.StatusOK)
})
defer srv.Close()
cfg := &Config{
MetricsEndpoint: srv.URL + "/v1/metrics",
Encoding: tt.encoding,
}
exp, err := createMetrics(context.Background(), set, cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate data
metrics := pmetric.NewMetrics()
err = exp.ConsumeMetrics(context.Background(), metrics)
require.NoError(t, err)
})
}
})
t.Run("logs", func(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) {
assert.Contains(t, request.Header.Get("content-type"), tt.expectedEncoding)
writer.WriteHeader(http.StatusOK)
})
defer srv.Close()
cfg := &Config{
LogsEndpoint: srv.URL + "/v1/logs",
Encoding: tt.encoding,
}
exp, err := createLogs(context.Background(), set, cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate data
logs := plog.NewLogs()
err = exp.ConsumeLogs(context.Background(), logs)
require.NoError(t, err)
srv.Close()
})
}
})
t.Run("profiles", func(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
srv := createBackend("/v1development/profiles", func(writer http.ResponseWriter, request *http.Request) {
assert.Contains(t, request.Header.Get("content-type"), test.expectedEncoding)
writer.WriteHeader(http.StatusOK)
})
defer srv.Close()
cfg := &Config{
ClientConfig: confighttp.ClientConfig{
Endpoint: srv.URL,
},
Encoding: test.encoding,
}
exp, err := createProfiles(context.Background(), set, cfg)
require.NoError(t, err)
// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
// generate data
profiles := pprofile.NewProfiles()
err = exp.ConsumeProfiles(context.Background(), profiles)
require.NoError(t, err)
})
}
})
}
func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc(endpoint, handler)
srv := httptest.NewServer(mux)
return srv
}
type badReader struct{}
func (b badReader) Read([]byte) (int, error) {
return 0, errors.New("Bad read")
}