Remove workers from OpenCensus receiver implementation (#497)

Workers were unnecessary complication and did not provide any measurable
performance gains.

In addition they were decoupling receiver from subsequent processors which
makes difficult upcoming implementation of backpressure in the pipeline
(from processors to receivers).

Test results that shows that worker give no performance benefits (any
differences are within the measurement error margin):

Without workers (this commit):

Test                                    |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items|
----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:|
Trace10kSPS/OpenCensus                  |PASS  |     16s|    22.7|    25.9|         37|         45|    149880|        149880|

With workers (latest `master` branch):

Test                                    |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items|
----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:|
Trace10kSPS/OpenCensus                  |PASS  |     16s|    23.3|    24.9|         37|         46|    149920|        149920|
This commit is contained in:
Tigran Najaryan 2020-01-13 15:54:33 -05:00 committed by GitHub
parent 4e01fa34ff
commit ec4ad0c650
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 217 additions and 101 deletions

View File

@ -30,23 +30,9 @@ import (
"github.com/open-telemetry/opentelemetry-collector/oterr"
)
const (
defaultNumWorkers = 4
messageChannelSize = 64
)
// Receiver is the type used to handle spans from OpenCensus exporters.
type Receiver struct {
nextConsumer consumer.TraceConsumer
numWorkers int
workers []*receiverWorker
messageChan chan *traceDataWithCtx
}
type traceDataWithCtx struct {
data *consumerdata.TraceData
ctx context.Context
}
// New creates a new opencensus.Receiver reference.
@ -55,25 +41,13 @@ func New(nextConsumer consumer.TraceConsumer, opts ...Option) (*Receiver, error)
return nil, oterr.ErrNilNextConsumer
}
messageChan := make(chan *traceDataWithCtx, messageChannelSize)
ocr := &Receiver{
nextConsumer: nextConsumer,
numWorkers: defaultNumWorkers,
messageChan: messageChan,
}
for _, opt := range opts {
opt(ocr)
}
// Setup and startup worker pool
workers := make([]*receiverWorker, 0, ocr.numWorkers)
for index := 0; index < ocr.numWorkers; index++ {
worker := newReceiverWorker(ocr)
go worker.listenOn(messageChan)
workers = append(workers, worker)
}
ocr.workers = workers
return ocr, nil
}
@ -89,13 +63,11 @@ func (ocr *Receiver) Config(tcs agenttracepb.TraceService_ConfigServer) error {
var errTraceExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node")
const receiverTagValue = "oc_trace"
// Export is the gRPC method that receives streamed traces from
// OpenCensus-traceproto compatible libraries/applications.
func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := observability.ContextWithReceiverName(tes.Context(), receiverTagValue)
ctxWithReceiverName := observability.ContextWithReceiverName(tes.Context(), "oc_trace")
// The first message MUST have a non-nil Node.
recv, err := tes.Recv()
@ -112,6 +84,31 @@ func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
var resource *resourcepb.Resource
// Now that we've got the first message with a Node, we can start to receive streamed up spans.
for {
lastNonNilNode, resource, err = ocr.processReceivedMsg(ctxWithReceiverName, lastNonNilNode, resource, recv)
if err != nil {
// Metrics and z-pages record data loss but there is no back pressure.
// However, cause the stream to be closed.
return nil
}
recv, err = tes.Recv()
if err != nil {
if err == io.EOF {
// Do not return EOF as an error so that grpc-gateway calls get an empty
// response with HTTP status code 200 rather than a 500 error with EOF.
return nil
}
return err
}
}
}
func (ocr *Receiver) processReceivedMsg(
ctx context.Context,
lastNonNilNode *commonpb.Node,
resource *resourcepb.Resource,
recv *agenttracepb.ExportTraceServiceRequest,
) (*commonpb.Node, *resourcepb.Resource, error) {
// If a Node has been sent from downstream, save and use it.
if recv.Node != nil {
lastNonNilNode = recv.Node
@ -130,79 +127,40 @@ func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
SourceFormat: "oc_trace",
}
ocr.messageChan <- &traceDataWithCtx{data: td, ctx: ctxWithReceiverName}
observability.RecordMetricsForTraceReceiver(ctxWithReceiverName, len(td.Spans), 0)
recv, err = tes.Recv()
if err != nil {
if err == io.EOF {
// Do not return EOF as an error so that grpc-gateway calls get an empty
// response with HTTP status code 200 rather than a 500 error with EOF.
return nil
}
return err
}
}
err := ocr.sendToNextConsumer(ctx, td)
return lastNonNilNode, resource, err
}
// Stop the receiver and its workers
func (ocr *Receiver) Stop() {
for _, worker := range ocr.workers {
worker.stopListening()
}
}
type receiverWorker struct {
receiver *Receiver
cancel chan struct{}
}
func newReceiverWorker(receiver *Receiver) *receiverWorker {
return &receiverWorker{
receiver: receiver,
cancel: make(chan struct{}),
}
}
func (rw *receiverWorker) listenOn(cn <-chan *traceDataWithCtx) {
for {
select {
case tdWithCtx := <-cn:
rw.export(tdWithCtx.ctx, tdWithCtx.data)
case <-rw.cancel:
return
}
}
}
func (rw *receiverWorker) stopListening() {
close(rw.cancel)
}
func (rw *receiverWorker) export(longLivedCtx context.Context, tracedata *consumerdata.TraceData) {
func (ocr *Receiver) sendToNextConsumer(longLivedCtx context.Context, tracedata *consumerdata.TraceData) error {
if tracedata == nil {
return
return nil
}
if len(tracedata.Spans) == 0 {
return
observability.RecordMetricsForTraceReceiver(longLivedCtx, 0, 0)
return nil
}
// Trace this method
ctx, span := trace.StartSpan(context.Background(), "OpenCensusTraceReceiver.Export")
defer span.End()
// TODO: (@odeke-em) investigate if it is necessary
// to group nodes with their respective spans during
// spansAndNode list unfurling then send spans grouped per node
// If the starting RPC has a parent span, then add it as a parent link.
observability.SetParentLink(longLivedCtx, span)
rw.receiver.nextConsumer.ConsumeTraceData(ctx, *tracedata)
err := ocr.nextConsumer.ConsumeTraceData(ctx, *tracedata)
if err != nil {
observability.RecordMetricsForTraceReceiver(longLivedCtx, 0, len(tracedata.Spans))
span.AddAttributes(trace.Int64Attribute("dropped_spans", int64(len(tracedata.Spans))))
span.Annotate([]trace.Attribute{
trace.Int64Attribute("num_spans", int64(len(tracedata.Spans))),
}, "")
span.SetStatus(trace.Status{
Code: trace.StatusCodeUnknown,
Message: err.Error(),
})
} else {
observability.RecordMetricsForTraceReceiver(longLivedCtx, len(tracedata.Spans), 0)
span.AddAttributes(trace.Int64Attribute("num_spans", int64(len(tracedata.Spans))))
}
return err
}

View File

@ -18,11 +18,3 @@ package octrace
//
// WithReceiver applies the configuration to the given receiver.
type Option func(*Receiver)
// WithWorkerCount sets the number of worker goroutines that will be started
// for the receiver
func WithWorkerCount(workerCount int) Option {
return func(r *Receiver) {
r.numWorkers = workerCount
}
}

View File

@ -195,12 +195,6 @@ func (ocr *Receiver) stop() error {
ocr.stopOnce.Do(func() {
err = nil
if ocr.traceReceiver != nil {
ocr.traceReceiver.Stop()
}
// Currently there is no symmetric stop for metrics receiver.
if ocr.serverHTTP != nil {
_ = ocr.serverHTTP.Close()
}

View File

@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
@ -28,14 +29,19 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
"github.com/open-telemetry/opentelemetry-collector/internal"
"github.com/open-telemetry/opentelemetry-collector/observability/observabilitytest"
"github.com/open-telemetry/opentelemetry-collector/testutils"
)
@ -365,3 +371,169 @@ func TestStartWithoutConsumersShouldFail(t *testing.T) {
mh := component.NewMockHost()
require.Error(t, r.Start(mh))
}
// TestOCReceiverTrace_HandleNextConsumerResponse checks if the trace receiver
// is returning the proper response (return and metrics) when the next consumer
// in the pipeline reports error. The test changes the responses returned by the
// next trace consumer, checks if data was passed down the pipeline and if
// proper metrics were recorded. It also uses all endpoints supported by the
// trace receiver.
func TestOCReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
type ingestionStateTest struct {
okToIngest bool
expectedCode codes.Code
}
tests := []struct {
name string
expectedReceivedBatches int
expectedIngestionBlockedRPCs int
ingestionStates []ingestionStateTest
}{
{
name: "IngestTest",
expectedReceivedBatches: 2,
expectedIngestionBlockedRPCs: 1,
ingestionStates: []ingestionStateTest{
{
okToIngest: true,
expectedCode: codes.OK,
},
{
okToIngest: false,
expectedCode: codes.OK,
},
{
okToIngest: true,
expectedCode: codes.OK,
},
},
},
}
addr := testutils.GetAvailableLocalAddress(t)
msg := &agenttracepb.ExportTraceServiceRequest{
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: "test-svc"},
},
Spans: []*tracepb.Span{
{
TraceId: []byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
},
},
},
}
exportBidiFn := func(
t *testing.T,
cc *grpc.ClientConn,
msg *agenttracepb.ExportTraceServiceRequest) error {
acc := agenttracepb.NewTraceServiceClient(cc)
stream, err := acc.Export(context.Background())
require.NoError(t, err)
require.NotNil(t, stream)
err = stream.Send(msg)
stream.CloseSend()
if err == nil {
for {
if _, err = stream.Recv(); err != nil {
if err == io.EOF {
err = nil
}
break
}
}
}
return err
}
exporters := []struct {
receiverTag string
exportFn func(
t *testing.T,
cc *grpc.ClientConn,
msg *agenttracepb.ExportTraceServiceRequest) error
}{
{
receiverTag: "oc_trace",
exportFn: exportBidiFn,
},
}
for _, exporter := range exporters {
for _, tt := range tests {
t.Run(tt.name+"/"+exporter.receiverTag, func(t *testing.T) {
doneFn := observabilitytest.SetupRecordedMetricsTest()
defer doneFn()
sink := new(sinkTraceConsumer)
var opts []Option
ocr, err := New(addr, nil, nil, opts...)
require.Nil(t, err)
require.NotNil(t, ocr)
ocr.traceConsumer = sink
err = ocr.Start(component.NewMockHost())
require.Nil(t, err)
defer ocr.Shutdown()
cc, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
t.Errorf("grpc.Dial: %v", err)
}
defer cc.Close()
for _, ingestionState := range tt.ingestionStates {
if ingestionState.okToIngest {
sink.SetConsumeTraceError(nil)
} else {
sink.SetConsumeTraceError(fmt.Errorf("%q: consumer error", tt.name))
}
err = exporter.exportFn(t, cc, msg)
status, ok := status.FromError(err)
require.True(t, ok)
assert.Equal(t, ingestionState.expectedCode, status.Code())
}
require.Equal(t, tt.expectedReceivedBatches, len(sink.AllTraces()))
require.Nil(
t,
observabilitytest.CheckValueViewReceiverReceivedSpans(
exporter.receiverTag,
tt.expectedReceivedBatches),
)
})
}
}
}
type sinkTraceConsumer struct {
// consumeTraceError is the error to be returned when ConsumeTraceData is
// called
consumeTraceError error
traces []consumerdata.TraceData
}
var _ consumer.TraceConsumer = (*sinkTraceConsumer)(nil)
func (stc *sinkTraceConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
if stc.consumeTraceError == nil {
stc.traces = append(stc.traces, td)
}
return stc.consumeTraceError
}
func (stc *sinkTraceConsumer) SetConsumeTraceError(err error) {
stc.consumeTraceError = err
}
func (stc *sinkTraceConsumer) AllTraces() []consumerdata.TraceData {
return stc.traces[:]
}