Remove workers from OpenCensus receiver implementation (#497)
Workers were unnecessary complication and did not provide any measurable performance gains. In addition they were decoupling receiver from subsequent processors which makes difficult upcoming implementation of backpressure in the pipeline (from processors to receivers). Test results that shows that worker give no performance benefits (any differences are within the measurement error margin): Without workers (this commit): Test |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items| ----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:| Trace10kSPS/OpenCensus |PASS | 16s| 22.7| 25.9| 37| 45| 149880| 149880| With workers (latest `master` branch): Test |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items| ----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:| Trace10kSPS/OpenCensus |PASS | 16s| 23.3| 24.9| 37| 46| 149920| 149920|
This commit is contained in:
parent
4e01fa34ff
commit
ec4ad0c650
|
@ -30,23 +30,9 @@ import (
|
|||
"github.com/open-telemetry/opentelemetry-collector/oterr"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultNumWorkers = 4
|
||||
|
||||
messageChannelSize = 64
|
||||
)
|
||||
|
||||
// Receiver is the type used to handle spans from OpenCensus exporters.
|
||||
type Receiver struct {
|
||||
nextConsumer consumer.TraceConsumer
|
||||
numWorkers int
|
||||
workers []*receiverWorker
|
||||
messageChan chan *traceDataWithCtx
|
||||
}
|
||||
|
||||
type traceDataWithCtx struct {
|
||||
data *consumerdata.TraceData
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// New creates a new opencensus.Receiver reference.
|
||||
|
@ -55,25 +41,13 @@ func New(nextConsumer consumer.TraceConsumer, opts ...Option) (*Receiver, error)
|
|||
return nil, oterr.ErrNilNextConsumer
|
||||
}
|
||||
|
||||
messageChan := make(chan *traceDataWithCtx, messageChannelSize)
|
||||
ocr := &Receiver{
|
||||
nextConsumer: nextConsumer,
|
||||
numWorkers: defaultNumWorkers,
|
||||
messageChan: messageChan,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(ocr)
|
||||
}
|
||||
|
||||
// Setup and startup worker pool
|
||||
workers := make([]*receiverWorker, 0, ocr.numWorkers)
|
||||
for index := 0; index < ocr.numWorkers; index++ {
|
||||
worker := newReceiverWorker(ocr)
|
||||
go worker.listenOn(messageChan)
|
||||
workers = append(workers, worker)
|
||||
}
|
||||
ocr.workers = workers
|
||||
|
||||
return ocr, nil
|
||||
}
|
||||
|
||||
|
@ -89,13 +63,11 @@ func (ocr *Receiver) Config(tcs agenttracepb.TraceService_ConfigServer) error {
|
|||
|
||||
var errTraceExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node")
|
||||
|
||||
const receiverTagValue = "oc_trace"
|
||||
|
||||
// Export is the gRPC method that receives streamed traces from
|
||||
// OpenCensus-traceproto compatible libraries/applications.
|
||||
func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
|
||||
// We need to ensure that it propagates the receiver name as a tag
|
||||
ctxWithReceiverName := observability.ContextWithReceiverName(tes.Context(), receiverTagValue)
|
||||
ctxWithReceiverName := observability.ContextWithReceiverName(tes.Context(), "oc_trace")
|
||||
|
||||
// The first message MUST have a non-nil Node.
|
||||
recv, err := tes.Recv()
|
||||
|
@ -112,6 +84,31 @@ func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
|
|||
var resource *resourcepb.Resource
|
||||
// Now that we've got the first message with a Node, we can start to receive streamed up spans.
|
||||
for {
|
||||
lastNonNilNode, resource, err = ocr.processReceivedMsg(ctxWithReceiverName, lastNonNilNode, resource, recv)
|
||||
if err != nil {
|
||||
// Metrics and z-pages record data loss but there is no back pressure.
|
||||
// However, cause the stream to be closed.
|
||||
return nil
|
||||
}
|
||||
|
||||
recv, err = tes.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
// Do not return EOF as an error so that grpc-gateway calls get an empty
|
||||
// response with HTTP status code 200 rather than a 500 error with EOF.
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ocr *Receiver) processReceivedMsg(
|
||||
ctx context.Context,
|
||||
lastNonNilNode *commonpb.Node,
|
||||
resource *resourcepb.Resource,
|
||||
recv *agenttracepb.ExportTraceServiceRequest,
|
||||
) (*commonpb.Node, *resourcepb.Resource, error) {
|
||||
// If a Node has been sent from downstream, save and use it.
|
||||
if recv.Node != nil {
|
||||
lastNonNilNode = recv.Node
|
||||
|
@ -130,79 +127,40 @@ func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
|
|||
SourceFormat: "oc_trace",
|
||||
}
|
||||
|
||||
ocr.messageChan <- &traceDataWithCtx{data: td, ctx: ctxWithReceiverName}
|
||||
|
||||
observability.RecordMetricsForTraceReceiver(ctxWithReceiverName, len(td.Spans), 0)
|
||||
|
||||
recv, err = tes.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
// Do not return EOF as an error so that grpc-gateway calls get an empty
|
||||
// response with HTTP status code 200 rather than a 500 error with EOF.
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
err := ocr.sendToNextConsumer(ctx, td)
|
||||
return lastNonNilNode, resource, err
|
||||
}
|
||||
|
||||
// Stop the receiver and its workers
|
||||
func (ocr *Receiver) Stop() {
|
||||
for _, worker := range ocr.workers {
|
||||
worker.stopListening()
|
||||
}
|
||||
}
|
||||
|
||||
type receiverWorker struct {
|
||||
receiver *Receiver
|
||||
cancel chan struct{}
|
||||
}
|
||||
|
||||
func newReceiverWorker(receiver *Receiver) *receiverWorker {
|
||||
return &receiverWorker{
|
||||
receiver: receiver,
|
||||
cancel: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (rw *receiverWorker) listenOn(cn <-chan *traceDataWithCtx) {
|
||||
for {
|
||||
select {
|
||||
case tdWithCtx := <-cn:
|
||||
rw.export(tdWithCtx.ctx, tdWithCtx.data)
|
||||
case <-rw.cancel:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rw *receiverWorker) stopListening() {
|
||||
close(rw.cancel)
|
||||
}
|
||||
|
||||
func (rw *receiverWorker) export(longLivedCtx context.Context, tracedata *consumerdata.TraceData) {
|
||||
func (ocr *Receiver) sendToNextConsumer(longLivedCtx context.Context, tracedata *consumerdata.TraceData) error {
|
||||
if tracedata == nil {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(tracedata.Spans) == 0 {
|
||||
return
|
||||
observability.RecordMetricsForTraceReceiver(longLivedCtx, 0, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Trace this method
|
||||
ctx, span := trace.StartSpan(context.Background(), "OpenCensusTraceReceiver.Export")
|
||||
defer span.End()
|
||||
|
||||
// TODO: (@odeke-em) investigate if it is necessary
|
||||
// to group nodes with their respective spans during
|
||||
// spansAndNode list unfurling then send spans grouped per node
|
||||
|
||||
// If the starting RPC has a parent span, then add it as a parent link.
|
||||
observability.SetParentLink(longLivedCtx, span)
|
||||
|
||||
rw.receiver.nextConsumer.ConsumeTraceData(ctx, *tracedata)
|
||||
err := ocr.nextConsumer.ConsumeTraceData(ctx, *tracedata)
|
||||
if err != nil {
|
||||
observability.RecordMetricsForTraceReceiver(longLivedCtx, 0, len(tracedata.Spans))
|
||||
span.AddAttributes(trace.Int64Attribute("dropped_spans", int64(len(tracedata.Spans))))
|
||||
|
||||
span.Annotate([]trace.Attribute{
|
||||
trace.Int64Attribute("num_spans", int64(len(tracedata.Spans))),
|
||||
}, "")
|
||||
span.SetStatus(trace.Status{
|
||||
Code: trace.StatusCodeUnknown,
|
||||
Message: err.Error(),
|
||||
})
|
||||
} else {
|
||||
observability.RecordMetricsForTraceReceiver(longLivedCtx, len(tracedata.Spans), 0)
|
||||
span.AddAttributes(trace.Int64Attribute("num_spans", int64(len(tracedata.Spans))))
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -18,11 +18,3 @@ package octrace
|
|||
//
|
||||
// WithReceiver applies the configuration to the given receiver.
|
||||
type Option func(*Receiver)
|
||||
|
||||
// WithWorkerCount sets the number of worker goroutines that will be started
|
||||
// for the receiver
|
||||
func WithWorkerCount(workerCount int) Option {
|
||||
return func(r *Receiver) {
|
||||
r.numWorkers = workerCount
|
||||
}
|
||||
}
|
||||
|
|
|
@ -195,12 +195,6 @@ func (ocr *Receiver) stop() error {
|
|||
ocr.stopOnce.Do(func() {
|
||||
err = nil
|
||||
|
||||
if ocr.traceReceiver != nil {
|
||||
ocr.traceReceiver.Stop()
|
||||
}
|
||||
|
||||
// Currently there is no symmetric stop for metrics receiver.
|
||||
|
||||
if ocr.serverHTTP != nil {
|
||||
_ = ocr.serverHTTP.Close()
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
|
@ -28,14 +29,19 @@ import (
|
|||
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
|
||||
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
|
||||
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/open-telemetry/opentelemetry-collector/component"
|
||||
"github.com/open-telemetry/opentelemetry-collector/consumer"
|
||||
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
|
||||
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
|
||||
"github.com/open-telemetry/opentelemetry-collector/internal"
|
||||
"github.com/open-telemetry/opentelemetry-collector/observability/observabilitytest"
|
||||
"github.com/open-telemetry/opentelemetry-collector/testutils"
|
||||
)
|
||||
|
||||
|
@ -365,3 +371,169 @@ func TestStartWithoutConsumersShouldFail(t *testing.T) {
|
|||
mh := component.NewMockHost()
|
||||
require.Error(t, r.Start(mh))
|
||||
}
|
||||
|
||||
// TestOCReceiverTrace_HandleNextConsumerResponse checks if the trace receiver
|
||||
// is returning the proper response (return and metrics) when the next consumer
|
||||
// in the pipeline reports error. The test changes the responses returned by the
|
||||
// next trace consumer, checks if data was passed down the pipeline and if
|
||||
// proper metrics were recorded. It also uses all endpoints supported by the
|
||||
// trace receiver.
|
||||
func TestOCReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
|
||||
type ingestionStateTest struct {
|
||||
okToIngest bool
|
||||
expectedCode codes.Code
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
expectedReceivedBatches int
|
||||
expectedIngestionBlockedRPCs int
|
||||
ingestionStates []ingestionStateTest
|
||||
}{
|
||||
{
|
||||
name: "IngestTest",
|
||||
expectedReceivedBatches: 2,
|
||||
expectedIngestionBlockedRPCs: 1,
|
||||
ingestionStates: []ingestionStateTest{
|
||||
{
|
||||
okToIngest: true,
|
||||
expectedCode: codes.OK,
|
||||
},
|
||||
{
|
||||
okToIngest: false,
|
||||
expectedCode: codes.OK,
|
||||
},
|
||||
{
|
||||
okToIngest: true,
|
||||
expectedCode: codes.OK,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
addr := testutils.GetAvailableLocalAddress(t)
|
||||
msg := &agenttracepb.ExportTraceServiceRequest{
|
||||
Node: &commonpb.Node{
|
||||
ServiceInfo: &commonpb.ServiceInfo{Name: "test-svc"},
|
||||
},
|
||||
Spans: []*tracepb.Span{
|
||||
{
|
||||
TraceId: []byte{
|
||||
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
|
||||
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
exportBidiFn := func(
|
||||
t *testing.T,
|
||||
cc *grpc.ClientConn,
|
||||
msg *agenttracepb.ExportTraceServiceRequest) error {
|
||||
|
||||
acc := agenttracepb.NewTraceServiceClient(cc)
|
||||
stream, err := acc.Export(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, stream)
|
||||
|
||||
err = stream.Send(msg)
|
||||
stream.CloseSend()
|
||||
if err == nil {
|
||||
for {
|
||||
if _, err = stream.Recv(); err != nil {
|
||||
if err == io.EOF {
|
||||
err = nil
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
exporters := []struct {
|
||||
receiverTag string
|
||||
exportFn func(
|
||||
t *testing.T,
|
||||
cc *grpc.ClientConn,
|
||||
msg *agenttracepb.ExportTraceServiceRequest) error
|
||||
}{
|
||||
{
|
||||
receiverTag: "oc_trace",
|
||||
exportFn: exportBidiFn,
|
||||
},
|
||||
}
|
||||
for _, exporter := range exporters {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name+"/"+exporter.receiverTag, func(t *testing.T) {
|
||||
doneFn := observabilitytest.SetupRecordedMetricsTest()
|
||||
defer doneFn()
|
||||
|
||||
sink := new(sinkTraceConsumer)
|
||||
|
||||
var opts []Option
|
||||
ocr, err := New(addr, nil, nil, opts...)
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, ocr)
|
||||
|
||||
ocr.traceConsumer = sink
|
||||
err = ocr.Start(component.NewMockHost())
|
||||
require.Nil(t, err)
|
||||
defer ocr.Shutdown()
|
||||
|
||||
cc, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock())
|
||||
if err != nil {
|
||||
t.Errorf("grpc.Dial: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
for _, ingestionState := range tt.ingestionStates {
|
||||
if ingestionState.okToIngest {
|
||||
sink.SetConsumeTraceError(nil)
|
||||
} else {
|
||||
sink.SetConsumeTraceError(fmt.Errorf("%q: consumer error", tt.name))
|
||||
}
|
||||
|
||||
err = exporter.exportFn(t, cc, msg)
|
||||
|
||||
status, ok := status.FromError(err)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, ingestionState.expectedCode, status.Code())
|
||||
}
|
||||
|
||||
require.Equal(t, tt.expectedReceivedBatches, len(sink.AllTraces()))
|
||||
require.Nil(
|
||||
t,
|
||||
observabilitytest.CheckValueViewReceiverReceivedSpans(
|
||||
exporter.receiverTag,
|
||||
tt.expectedReceivedBatches),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type sinkTraceConsumer struct {
|
||||
// consumeTraceError is the error to be returned when ConsumeTraceData is
|
||||
// called
|
||||
consumeTraceError error
|
||||
|
||||
traces []consumerdata.TraceData
|
||||
}
|
||||
|
||||
var _ consumer.TraceConsumer = (*sinkTraceConsumer)(nil)
|
||||
|
||||
func (stc *sinkTraceConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
|
||||
if stc.consumeTraceError == nil {
|
||||
stc.traces = append(stc.traces, td)
|
||||
}
|
||||
return stc.consumeTraceError
|
||||
}
|
||||
|
||||
func (stc *sinkTraceConsumer) SetConsumeTraceError(err error) {
|
||||
stc.consumeTraceError = err
|
||||
}
|
||||
|
||||
func (stc *sinkTraceConsumer) AllTraces() []consumerdata.TraceData {
|
||||
return stc.traces[:]
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue