Handle errors in exporters (#1259)

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
This commit is contained in:
Juraci Paixão Kröhling 2020-07-03 15:41:46 +02:00 committed by GitHub
parent 117cf7a5a6
commit 64403a6683
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 20 additions and 18 deletions

View File

@ -16,6 +16,7 @@ package jaegerexporter
import (
"context"
"fmt"
jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2"
"google.golang.org/grpc"
@ -70,7 +71,7 @@ func (s *protoGRPCSender) pushTraceData(
batches, err := jaegertranslator.InternalTracesToJaegerProto(td)
if err != nil {
return td.SpanCount(), consumererror.Permanent(err)
return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err))
}
if s.metadata.Len() > 0 {
@ -83,7 +84,7 @@ func (s *protoGRPCSender) pushTraceData(
ctx,
&jaegerproto.PostSpansRequest{Batch: *batch}, grpc.WaitForReady(s.waitForReady))
if err != nil {
return td.SpanCount() - sentSpans, err
return td.SpanCount() - sentSpans, fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err)
}
sentSpans += len(batch.Spans)
}

View File

@ -156,7 +156,7 @@ func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.T
code: errAlreadyStopped,
msg: "OpenCensus exporter was already stopped.",
}
return len(td.Spans), err
return len(td.Spans), fmt.Errorf("failed to push trace data via OpenCensus exporter: %w", err)
}
err := exporter.ExportTraceServiceRequest(
@ -168,7 +168,7 @@ func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.T
)
oce.exporters <- exporter
if err != nil {
return len(td.Spans), err
return len(td.Spans), fmt.Errorf("failed to push trace data via OpenCensus exporter: %w", err)
}
return 0, nil
}
@ -181,7 +181,7 @@ func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata
code: errAlreadyStopped,
msg: "OpenCensus exporter was already stopped.",
}
return exporterhelper.NumTimeSeries(md), err
return exporterhelper.NumTimeSeries(md), fmt.Errorf("failed to push metrics data via OpenCensus exporter: %w", err)
}
req := &agentmetricspb.ExportMetricsServiceRequest{
@ -192,7 +192,7 @@ func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata
err := exporter.ExportMetricsServiceRequest(req)
oce.exporters <- exporter
if err != nil {
return exporterhelper.NumTimeSeries(md), err
return exporterhelper.NumTimeSeries(md), fmt.Errorf("failed to push metrics data via OpenCensus exporter: %w", err)
}
return 0, nil
}

View File

@ -158,7 +158,7 @@ func (oce *otlpExporter) pushTraceData(ctx context.Context, td pdata.Traces) (in
err := oce.exporter.exportTrace(ctx, request)
if err != nil {
return td.SpanCount(), err
return td.SpanCount(), fmt.Errorf("failed to push trace data via OTLP exporter: %w", err)
}
return 0, nil
}
@ -171,7 +171,7 @@ func (oce *otlpExporter) pushMetricsData(ctx context.Context, md pdata.Metrics)
err := oce.exporter.exportMetrics(ctx, request)
if err != nil {
return imd.MetricCount(), err
return imd.MetricCount(), fmt.Errorf("failed to push metrics data via OTLP exporter: %w", err)
}
return 0, nil
}
@ -183,7 +183,7 @@ func (oce *otlpExporter) pushLogData(ctx context.Context, logs data.Logs) (int,
err := oce.exporter.exportLogs(ctx, request)
if err != nil {
return logs.LogRecordCount(), err
return logs.LogRecordCount(), fmt.Errorf("failed to push log data via OTLP exporter: %w", err)
}
return 0, nil
}

View File

@ -83,33 +83,32 @@ func createZipkinExporter(cfg *Config) (*zipkinExporter, error) {
return ze, nil
}
func (ze *zipkinExporter) PushTraceData(_ context.Context, td consumerdata.TraceData) (int, error) {
func (ze *zipkinExporter) PushTraceData(ctx context.Context, td consumerdata.TraceData) (int, error) {
tbatch := make([]*zipkinmodel.SpanModel, 0, len(td.Spans))
var resource *resourcepb.Resource = td.Resource
for _, span := range td.Spans {
zs, err := zipkin.OCSpanProtoToZipkin(td.Node, resource, span, ze.defaultServiceName)
if err != nil {
return len(td.Spans), consumererror.Permanent(err)
return len(td.Spans), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}
tbatch = append(tbatch, zs)
}
body, err := ze.serializer.Serialize(tbatch)
if err != nil {
return len(td.Spans), consumererror.Permanent(err)
return len(td.Spans), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}
req, err := http.NewRequest("POST", ze.url, bytes.NewReader(body))
req, err := http.NewRequestWithContext(ctx, "POST", ze.url, bytes.NewReader(body))
if err != nil {
return len(td.Spans), err
return len(td.Spans), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
}
req.Header.Set("Content-Type", ze.serializer.ContentType())
resp, err := ze.client.Do(req)
if err != nil {
return len(td.Spans), err
return len(td.Spans), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
}
_ = resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {

View File

@ -171,9 +171,11 @@ func (bp *batchTraceProcessor) resetTimer() {
func (bp *batchTraceProcessor) sendItems(measure *stats.Int64Measure) {
// Add that it came form the trace pipeline?
statsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, bp.name)}
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1))
stats.RecordWithTags(context.Background(), statsTags, measure.M(1))
_ = bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData())
if err := bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData()); err != nil {
bp.logger.Warn("Sender failed", zap.Error(err))
}
bp.batchTraces.reset()
}