Remove workers from OpenCensus receiver implementation (#497)

Workers were unnecessary complication and did not provide any measurable
performance gains.

In addition they were decoupling receiver from subsequent processors which
makes difficult upcoming implementation of backpressure in the pipeline
(from processors to receivers).

Test results that shows that worker give no performance benefits (any
differences are within the measurement error margin):

Without workers (this commit):

Test                                    |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items|
----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:|
Trace10kSPS/OpenCensus                  |PASS  |     16s|    22.7|    25.9|         37|         45|    149880|        149880|

With workers (latest `master` branch):

Test                                    |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items|
----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:|
Trace10kSPS/OpenCensus                  |PASS  |     16s|    23.3|    24.9|         37|         46|    149920|        149920|
This commit is contained in:
Tigran Najaryan 2020-01-13 15:54:33 -05:00 committed by GitHub
parent 4e01fa34ff
commit ec4ad0c650
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 217 additions and 101 deletions

View File

@ -30,23 +30,9 @@ import (
"github.com/open-telemetry/opentelemetry-collector/oterr" "github.com/open-telemetry/opentelemetry-collector/oterr"
) )
const (
defaultNumWorkers = 4
messageChannelSize = 64
)
// Receiver is the type used to handle spans from OpenCensus exporters. // Receiver is the type used to handle spans from OpenCensus exporters.
type Receiver struct { type Receiver struct {
nextConsumer consumer.TraceConsumer nextConsumer consumer.TraceConsumer
numWorkers int
workers []*receiverWorker
messageChan chan *traceDataWithCtx
}
type traceDataWithCtx struct {
data *consumerdata.TraceData
ctx context.Context
} }
// New creates a new opencensus.Receiver reference. // New creates a new opencensus.Receiver reference.
@ -55,25 +41,13 @@ func New(nextConsumer consumer.TraceConsumer, opts ...Option) (*Receiver, error)
return nil, oterr.ErrNilNextConsumer return nil, oterr.ErrNilNextConsumer
} }
messageChan := make(chan *traceDataWithCtx, messageChannelSize)
ocr := &Receiver{ ocr := &Receiver{
nextConsumer: nextConsumer, nextConsumer: nextConsumer,
numWorkers: defaultNumWorkers,
messageChan: messageChan,
} }
for _, opt := range opts { for _, opt := range opts {
opt(ocr) opt(ocr)
} }
// Setup and startup worker pool
workers := make([]*receiverWorker, 0, ocr.numWorkers)
for index := 0; index < ocr.numWorkers; index++ {
worker := newReceiverWorker(ocr)
go worker.listenOn(messageChan)
workers = append(workers, worker)
}
ocr.workers = workers
return ocr, nil return ocr, nil
} }
@ -89,13 +63,11 @@ func (ocr *Receiver) Config(tcs agenttracepb.TraceService_ConfigServer) error {
var errTraceExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node") var errTraceExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node")
const receiverTagValue = "oc_trace"
// Export is the gRPC method that receives streamed traces from // Export is the gRPC method that receives streamed traces from
// OpenCensus-traceproto compatible libraries/applications. // OpenCensus-traceproto compatible libraries/applications.
func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error { func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
// We need to ensure that it propagates the receiver name as a tag // We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := observability.ContextWithReceiverName(tes.Context(), receiverTagValue) ctxWithReceiverName := observability.ContextWithReceiverName(tes.Context(), "oc_trace")
// The first message MUST have a non-nil Node. // The first message MUST have a non-nil Node.
recv, err := tes.Recv() recv, err := tes.Recv()
@ -112,28 +84,13 @@ func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
var resource *resourcepb.Resource var resource *resourcepb.Resource
// Now that we've got the first message with a Node, we can start to receive streamed up spans. // Now that we've got the first message with a Node, we can start to receive streamed up spans.
for { for {
// If a Node has been sent from downstream, save and use it. lastNonNilNode, resource, err = ocr.processReceivedMsg(ctxWithReceiverName, lastNonNilNode, resource, recv)
if recv.Node != nil { if err != nil {
lastNonNilNode = recv.Node // Metrics and z-pages record data loss but there is no back pressure.
// However, cause the stream to be closed.
return nil
} }
// TODO(songya): differentiate between unset and nil resource. See
// https://github.com/census-instrumentation/opencensus-proto/issues/146.
if recv.Resource != nil {
resource = recv.Resource
}
td := &consumerdata.TraceData{
Node: lastNonNilNode,
Resource: resource,
Spans: recv.Spans,
SourceFormat: "oc_trace",
}
ocr.messageChan <- &traceDataWithCtx{data: td, ctx: ctxWithReceiverName}
observability.RecordMetricsForTraceReceiver(ctxWithReceiverName, len(td.Spans), 0)
recv, err = tes.Recv() recv, err = tes.Recv()
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
@ -146,63 +103,64 @@ func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
} }
} }
// Stop the receiver and its workers func (ocr *Receiver) processReceivedMsg(
func (ocr *Receiver) Stop() { ctx context.Context,
for _, worker := range ocr.workers { lastNonNilNode *commonpb.Node,
worker.stopListening() resource *resourcepb.Resource,
recv *agenttracepb.ExportTraceServiceRequest,
) (*commonpb.Node, *resourcepb.Resource, error) {
// If a Node has been sent from downstream, save and use it.
if recv.Node != nil {
lastNonNilNode = recv.Node
} }
}
type receiverWorker struct { // TODO(songya): differentiate between unset and nil resource. See
receiver *Receiver // https://github.com/census-instrumentation/opencensus-proto/issues/146.
cancel chan struct{} if recv.Resource != nil {
} resource = recv.Resource
func newReceiverWorker(receiver *Receiver) *receiverWorker {
return &receiverWorker{
receiver: receiver,
cancel: make(chan struct{}),
} }
}
func (rw *receiverWorker) listenOn(cn <-chan *traceDataWithCtx) { td := &consumerdata.TraceData{
for { Node: lastNonNilNode,
select { Resource: resource,
case tdWithCtx := <-cn: Spans: recv.Spans,
rw.export(tdWithCtx.ctx, tdWithCtx.data) SourceFormat: "oc_trace",
case <-rw.cancel:
return
}
} }
err := ocr.sendToNextConsumer(ctx, td)
return lastNonNilNode, resource, err
} }
func (rw *receiverWorker) stopListening() { func (ocr *Receiver) sendToNextConsumer(longLivedCtx context.Context, tracedata *consumerdata.TraceData) error {
close(rw.cancel)
}
func (rw *receiverWorker) export(longLivedCtx context.Context, tracedata *consumerdata.TraceData) {
if tracedata == nil { if tracedata == nil {
return return nil
} }
if len(tracedata.Spans) == 0 { if len(tracedata.Spans) == 0 {
return observability.RecordMetricsForTraceReceiver(longLivedCtx, 0, 0)
return nil
} }
// Trace this method // Trace this method
ctx, span := trace.StartSpan(context.Background(), "OpenCensusTraceReceiver.Export") ctx, span := trace.StartSpan(context.Background(), "OpenCensusTraceReceiver.Export")
defer span.End() defer span.End()
// TODO: (@odeke-em) investigate if it is necessary
// to group nodes with their respective spans during
// spansAndNode list unfurling then send spans grouped per node
// If the starting RPC has a parent span, then add it as a parent link. // If the starting RPC has a parent span, then add it as a parent link.
observability.SetParentLink(longLivedCtx, span) observability.SetParentLink(longLivedCtx, span)
rw.receiver.nextConsumer.ConsumeTraceData(ctx, *tracedata) err := ocr.nextConsumer.ConsumeTraceData(ctx, *tracedata)
if err != nil {
observability.RecordMetricsForTraceReceiver(longLivedCtx, 0, len(tracedata.Spans))
span.AddAttributes(trace.Int64Attribute("dropped_spans", int64(len(tracedata.Spans))))
span.Annotate([]trace.Attribute{ span.SetStatus(trace.Status{
trace.Int64Attribute("num_spans", int64(len(tracedata.Spans))), Code: trace.StatusCodeUnknown,
}, "") Message: err.Error(),
})
} else {
observability.RecordMetricsForTraceReceiver(longLivedCtx, len(tracedata.Spans), 0)
span.AddAttributes(trace.Int64Attribute("num_spans", int64(len(tracedata.Spans))))
}
return err
} }

View File

@ -18,11 +18,3 @@ package octrace
// //
// WithReceiver applies the configuration to the given receiver. // WithReceiver applies the configuration to the given receiver.
type Option func(*Receiver) type Option func(*Receiver)
// WithWorkerCount sets the number of worker goroutines that will be started
// for the receiver
func WithWorkerCount(workerCount int) Option {
return func(r *Receiver) {
r.numWorkers = workerCount
}
}

View File

@ -195,12 +195,6 @@ func (ocr *Receiver) stop() error {
ocr.stopOnce.Do(func() { ocr.stopOnce.Do(func() {
err = nil err = nil
if ocr.traceReceiver != nil {
ocr.traceReceiver.Stop()
}
// Currently there is no symmetric stop for metrics receiver.
if ocr.serverHTTP != nil { if ocr.serverHTTP != nil {
_ = ocr.serverHTTP.Close() _ = ocr.serverHTTP.Close()
} }

View File

@ -18,6 +18,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"net" "net"
"net/http" "net/http"
@ -28,14 +29,19 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"github.com/open-telemetry/opentelemetry-collector/component" "github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
"github.com/open-telemetry/opentelemetry-collector/internal" "github.com/open-telemetry/opentelemetry-collector/internal"
"github.com/open-telemetry/opentelemetry-collector/observability/observabilitytest"
"github.com/open-telemetry/opentelemetry-collector/testutils" "github.com/open-telemetry/opentelemetry-collector/testutils"
) )
@ -365,3 +371,169 @@ func TestStartWithoutConsumersShouldFail(t *testing.T) {
mh := component.NewMockHost() mh := component.NewMockHost()
require.Error(t, r.Start(mh)) require.Error(t, r.Start(mh))
} }
// TestOCReceiverTrace_HandleNextConsumerResponse checks if the trace receiver
// is returning the proper response (return and metrics) when the next consumer
// in the pipeline reports error. The test changes the responses returned by the
// next trace consumer, checks if data was passed down the pipeline and if
// proper metrics were recorded. It also uses all endpoints supported by the
// trace receiver.
func TestOCReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
type ingestionStateTest struct {
okToIngest bool
expectedCode codes.Code
}
tests := []struct {
name string
expectedReceivedBatches int
expectedIngestionBlockedRPCs int
ingestionStates []ingestionStateTest
}{
{
name: "IngestTest",
expectedReceivedBatches: 2,
expectedIngestionBlockedRPCs: 1,
ingestionStates: []ingestionStateTest{
{
okToIngest: true,
expectedCode: codes.OK,
},
{
okToIngest: false,
expectedCode: codes.OK,
},
{
okToIngest: true,
expectedCode: codes.OK,
},
},
},
}
addr := testutils.GetAvailableLocalAddress(t)
msg := &agenttracepb.ExportTraceServiceRequest{
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: "test-svc"},
},
Spans: []*tracepb.Span{
{
TraceId: []byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
},
},
},
}
exportBidiFn := func(
t *testing.T,
cc *grpc.ClientConn,
msg *agenttracepb.ExportTraceServiceRequest) error {
acc := agenttracepb.NewTraceServiceClient(cc)
stream, err := acc.Export(context.Background())
require.NoError(t, err)
require.NotNil(t, stream)
err = stream.Send(msg)
stream.CloseSend()
if err == nil {
for {
if _, err = stream.Recv(); err != nil {
if err == io.EOF {
err = nil
}
break
}
}
}
return err
}
exporters := []struct {
receiverTag string
exportFn func(
t *testing.T,
cc *grpc.ClientConn,
msg *agenttracepb.ExportTraceServiceRequest) error
}{
{
receiverTag: "oc_trace",
exportFn: exportBidiFn,
},
}
for _, exporter := range exporters {
for _, tt := range tests {
t.Run(tt.name+"/"+exporter.receiverTag, func(t *testing.T) {
doneFn := observabilitytest.SetupRecordedMetricsTest()
defer doneFn()
sink := new(sinkTraceConsumer)
var opts []Option
ocr, err := New(addr, nil, nil, opts...)
require.Nil(t, err)
require.NotNil(t, ocr)
ocr.traceConsumer = sink
err = ocr.Start(component.NewMockHost())
require.Nil(t, err)
defer ocr.Shutdown()
cc, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
t.Errorf("grpc.Dial: %v", err)
}
defer cc.Close()
for _, ingestionState := range tt.ingestionStates {
if ingestionState.okToIngest {
sink.SetConsumeTraceError(nil)
} else {
sink.SetConsumeTraceError(fmt.Errorf("%q: consumer error", tt.name))
}
err = exporter.exportFn(t, cc, msg)
status, ok := status.FromError(err)
require.True(t, ok)
assert.Equal(t, ingestionState.expectedCode, status.Code())
}
require.Equal(t, tt.expectedReceivedBatches, len(sink.AllTraces()))
require.Nil(
t,
observabilitytest.CheckValueViewReceiverReceivedSpans(
exporter.receiverTag,
tt.expectedReceivedBatches),
)
})
}
}
}
type sinkTraceConsumer struct {
// consumeTraceError is the error to be returned when ConsumeTraceData is
// called
consumeTraceError error
traces []consumerdata.TraceData
}
var _ consumer.TraceConsumer = (*sinkTraceConsumer)(nil)
func (stc *sinkTraceConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
if stc.consumeTraceError == nil {
stc.traces = append(stc.traces, td)
}
return stc.consumeTraceError
}
func (stc *sinkTraceConsumer) SetConsumeTraceError(err error) {
stc.consumeTraceError = err
}
func (stc *sinkTraceConsumer) AllTraces() []consumerdata.TraceData {
return stc.traces[:]
}