Remove workers from OpenCensus receiver implementation (#497)
Workers were unnecessary complication and did not provide any measurable performance gains. In addition they were decoupling receiver from subsequent processors which makes difficult upcoming implementation of backpressure in the pipeline (from processors to receivers). Test results that shows that worker give no performance benefits (any differences are within the measurement error margin): Without workers (this commit): Test |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items| ----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:| Trace10kSPS/OpenCensus |PASS | 16s| 22.7| 25.9| 37| 45| 149880| 149880| With workers (latest `master` branch): Test |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items| ----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:| Trace10kSPS/OpenCensus |PASS | 16s| 23.3| 24.9| 37| 46| 149920| 149920|
This commit is contained in:
parent
4e01fa34ff
commit
ec4ad0c650
|
@ -30,23 +30,9 @@ import (
|
||||||
"github.com/open-telemetry/opentelemetry-collector/oterr"
|
"github.com/open-telemetry/opentelemetry-collector/oterr"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
defaultNumWorkers = 4
|
|
||||||
|
|
||||||
messageChannelSize = 64
|
|
||||||
)
|
|
||||||
|
|
||||||
// Receiver is the type used to handle spans from OpenCensus exporters.
|
// Receiver is the type used to handle spans from OpenCensus exporters.
|
||||||
type Receiver struct {
|
type Receiver struct {
|
||||||
nextConsumer consumer.TraceConsumer
|
nextConsumer consumer.TraceConsumer
|
||||||
numWorkers int
|
|
||||||
workers []*receiverWorker
|
|
||||||
messageChan chan *traceDataWithCtx
|
|
||||||
}
|
|
||||||
|
|
||||||
type traceDataWithCtx struct {
|
|
||||||
data *consumerdata.TraceData
|
|
||||||
ctx context.Context
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new opencensus.Receiver reference.
|
// New creates a new opencensus.Receiver reference.
|
||||||
|
@ -55,25 +41,13 @@ func New(nextConsumer consumer.TraceConsumer, opts ...Option) (*Receiver, error)
|
||||||
return nil, oterr.ErrNilNextConsumer
|
return nil, oterr.ErrNilNextConsumer
|
||||||
}
|
}
|
||||||
|
|
||||||
messageChan := make(chan *traceDataWithCtx, messageChannelSize)
|
|
||||||
ocr := &Receiver{
|
ocr := &Receiver{
|
||||||
nextConsumer: nextConsumer,
|
nextConsumer: nextConsumer,
|
||||||
numWorkers: defaultNumWorkers,
|
|
||||||
messageChan: messageChan,
|
|
||||||
}
|
}
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
opt(ocr)
|
opt(ocr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup and startup worker pool
|
|
||||||
workers := make([]*receiverWorker, 0, ocr.numWorkers)
|
|
||||||
for index := 0; index < ocr.numWorkers; index++ {
|
|
||||||
worker := newReceiverWorker(ocr)
|
|
||||||
go worker.listenOn(messageChan)
|
|
||||||
workers = append(workers, worker)
|
|
||||||
}
|
|
||||||
ocr.workers = workers
|
|
||||||
|
|
||||||
return ocr, nil
|
return ocr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,13 +63,11 @@ func (ocr *Receiver) Config(tcs agenttracepb.TraceService_ConfigServer) error {
|
||||||
|
|
||||||
var errTraceExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node")
|
var errTraceExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node")
|
||||||
|
|
||||||
const receiverTagValue = "oc_trace"
|
|
||||||
|
|
||||||
// Export is the gRPC method that receives streamed traces from
|
// Export is the gRPC method that receives streamed traces from
|
||||||
// OpenCensus-traceproto compatible libraries/applications.
|
// OpenCensus-traceproto compatible libraries/applications.
|
||||||
func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
|
func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
|
||||||
// We need to ensure that it propagates the receiver name as a tag
|
// We need to ensure that it propagates the receiver name as a tag
|
||||||
ctxWithReceiverName := observability.ContextWithReceiverName(tes.Context(), receiverTagValue)
|
ctxWithReceiverName := observability.ContextWithReceiverName(tes.Context(), "oc_trace")
|
||||||
|
|
||||||
// The first message MUST have a non-nil Node.
|
// The first message MUST have a non-nil Node.
|
||||||
recv, err := tes.Recv()
|
recv, err := tes.Recv()
|
||||||
|
@ -112,28 +84,13 @@ func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
|
||||||
var resource *resourcepb.Resource
|
var resource *resourcepb.Resource
|
||||||
// Now that we've got the first message with a Node, we can start to receive streamed up spans.
|
// Now that we've got the first message with a Node, we can start to receive streamed up spans.
|
||||||
for {
|
for {
|
||||||
// If a Node has been sent from downstream, save and use it.
|
lastNonNilNode, resource, err = ocr.processReceivedMsg(ctxWithReceiverName, lastNonNilNode, resource, recv)
|
||||||
if recv.Node != nil {
|
if err != nil {
|
||||||
lastNonNilNode = recv.Node
|
// Metrics and z-pages record data loss but there is no back pressure.
|
||||||
|
// However, cause the stream to be closed.
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(songya): differentiate between unset and nil resource. See
|
|
||||||
// https://github.com/census-instrumentation/opencensus-proto/issues/146.
|
|
||||||
if recv.Resource != nil {
|
|
||||||
resource = recv.Resource
|
|
||||||
}
|
|
||||||
|
|
||||||
td := &consumerdata.TraceData{
|
|
||||||
Node: lastNonNilNode,
|
|
||||||
Resource: resource,
|
|
||||||
Spans: recv.Spans,
|
|
||||||
SourceFormat: "oc_trace",
|
|
||||||
}
|
|
||||||
|
|
||||||
ocr.messageChan <- &traceDataWithCtx{data: td, ctx: ctxWithReceiverName}
|
|
||||||
|
|
||||||
observability.RecordMetricsForTraceReceiver(ctxWithReceiverName, len(td.Spans), 0)
|
|
||||||
|
|
||||||
recv, err = tes.Recv()
|
recv, err = tes.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
|
@ -146,63 +103,64 @@ func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop the receiver and its workers
|
func (ocr *Receiver) processReceivedMsg(
|
||||||
func (ocr *Receiver) Stop() {
|
ctx context.Context,
|
||||||
for _, worker := range ocr.workers {
|
lastNonNilNode *commonpb.Node,
|
||||||
worker.stopListening()
|
resource *resourcepb.Resource,
|
||||||
|
recv *agenttracepb.ExportTraceServiceRequest,
|
||||||
|
) (*commonpb.Node, *resourcepb.Resource, error) {
|
||||||
|
// If a Node has been sent from downstream, save and use it.
|
||||||
|
if recv.Node != nil {
|
||||||
|
lastNonNilNode = recv.Node
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
type receiverWorker struct {
|
// TODO(songya): differentiate between unset and nil resource. See
|
||||||
receiver *Receiver
|
// https://github.com/census-instrumentation/opencensus-proto/issues/146.
|
||||||
cancel chan struct{}
|
if recv.Resource != nil {
|
||||||
}
|
resource = recv.Resource
|
||||||
|
|
||||||
func newReceiverWorker(receiver *Receiver) *receiverWorker {
|
|
||||||
return &receiverWorker{
|
|
||||||
receiver: receiver,
|
|
||||||
cancel: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func (rw *receiverWorker) listenOn(cn <-chan *traceDataWithCtx) {
|
td := &consumerdata.TraceData{
|
||||||
for {
|
Node: lastNonNilNode,
|
||||||
select {
|
Resource: resource,
|
||||||
case tdWithCtx := <-cn:
|
Spans: recv.Spans,
|
||||||
rw.export(tdWithCtx.ctx, tdWithCtx.data)
|
SourceFormat: "oc_trace",
|
||||||
case <-rw.cancel:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err := ocr.sendToNextConsumer(ctx, td)
|
||||||
|
return lastNonNilNode, resource, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rw *receiverWorker) stopListening() {
|
func (ocr *Receiver) sendToNextConsumer(longLivedCtx context.Context, tracedata *consumerdata.TraceData) error {
|
||||||
close(rw.cancel)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rw *receiverWorker) export(longLivedCtx context.Context, tracedata *consumerdata.TraceData) {
|
|
||||||
if tracedata == nil {
|
if tracedata == nil {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(tracedata.Spans) == 0 {
|
if len(tracedata.Spans) == 0 {
|
||||||
return
|
observability.RecordMetricsForTraceReceiver(longLivedCtx, 0, 0)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trace this method
|
// Trace this method
|
||||||
ctx, span := trace.StartSpan(context.Background(), "OpenCensusTraceReceiver.Export")
|
ctx, span := trace.StartSpan(context.Background(), "OpenCensusTraceReceiver.Export")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
// TODO: (@odeke-em) investigate if it is necessary
|
|
||||||
// to group nodes with their respective spans during
|
|
||||||
// spansAndNode list unfurling then send spans grouped per node
|
|
||||||
|
|
||||||
// If the starting RPC has a parent span, then add it as a parent link.
|
// If the starting RPC has a parent span, then add it as a parent link.
|
||||||
observability.SetParentLink(longLivedCtx, span)
|
observability.SetParentLink(longLivedCtx, span)
|
||||||
|
|
||||||
rw.receiver.nextConsumer.ConsumeTraceData(ctx, *tracedata)
|
err := ocr.nextConsumer.ConsumeTraceData(ctx, *tracedata)
|
||||||
|
if err != nil {
|
||||||
|
observability.RecordMetricsForTraceReceiver(longLivedCtx, 0, len(tracedata.Spans))
|
||||||
|
span.AddAttributes(trace.Int64Attribute("dropped_spans", int64(len(tracedata.Spans))))
|
||||||
|
|
||||||
span.Annotate([]trace.Attribute{
|
span.SetStatus(trace.Status{
|
||||||
trace.Int64Attribute("num_spans", int64(len(tracedata.Spans))),
|
Code: trace.StatusCodeUnknown,
|
||||||
}, "")
|
Message: err.Error(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
observability.RecordMetricsForTraceReceiver(longLivedCtx, len(tracedata.Spans), 0)
|
||||||
|
span.AddAttributes(trace.Int64Attribute("num_spans", int64(len(tracedata.Spans))))
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,3 @@ package octrace
|
||||||
//
|
//
|
||||||
// WithReceiver applies the configuration to the given receiver.
|
// WithReceiver applies the configuration to the given receiver.
|
||||||
type Option func(*Receiver)
|
type Option func(*Receiver)
|
||||||
|
|
||||||
// WithWorkerCount sets the number of worker goroutines that will be started
|
|
||||||
// for the receiver
|
|
||||||
func WithWorkerCount(workerCount int) Option {
|
|
||||||
return func(r *Receiver) {
|
|
||||||
r.numWorkers = workerCount
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -195,12 +195,6 @@ func (ocr *Receiver) stop() error {
|
||||||
ocr.stopOnce.Do(func() {
|
ocr.stopOnce.Do(func() {
|
||||||
err = nil
|
err = nil
|
||||||
|
|
||||||
if ocr.traceReceiver != nil {
|
|
||||||
ocr.traceReceiver.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Currently there is no symmetric stop for metrics receiver.
|
|
||||||
|
|
||||||
if ocr.serverHTTP != nil {
|
if ocr.serverHTTP != nil {
|
||||||
_ = ocr.serverHTTP.Close()
|
_ = ocr.serverHTTP.Close()
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -28,14 +29,19 @@ import (
|
||||||
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
|
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
|
||||||
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
|
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
|
||||||
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
|
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
"github.com/open-telemetry/opentelemetry-collector/component"
|
"github.com/open-telemetry/opentelemetry-collector/component"
|
||||||
|
"github.com/open-telemetry/opentelemetry-collector/consumer"
|
||||||
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
|
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
|
||||||
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
|
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
|
||||||
"github.com/open-telemetry/opentelemetry-collector/internal"
|
"github.com/open-telemetry/opentelemetry-collector/internal"
|
||||||
|
"github.com/open-telemetry/opentelemetry-collector/observability/observabilitytest"
|
||||||
"github.com/open-telemetry/opentelemetry-collector/testutils"
|
"github.com/open-telemetry/opentelemetry-collector/testutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -365,3 +371,169 @@ func TestStartWithoutConsumersShouldFail(t *testing.T) {
|
||||||
mh := component.NewMockHost()
|
mh := component.NewMockHost()
|
||||||
require.Error(t, r.Start(mh))
|
require.Error(t, r.Start(mh))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestOCReceiverTrace_HandleNextConsumerResponse checks if the trace receiver
|
||||||
|
// is returning the proper response (return and metrics) when the next consumer
|
||||||
|
// in the pipeline reports error. The test changes the responses returned by the
|
||||||
|
// next trace consumer, checks if data was passed down the pipeline and if
|
||||||
|
// proper metrics were recorded. It also uses all endpoints supported by the
|
||||||
|
// trace receiver.
|
||||||
|
func TestOCReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
|
||||||
|
type ingestionStateTest struct {
|
||||||
|
okToIngest bool
|
||||||
|
expectedCode codes.Code
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
expectedReceivedBatches int
|
||||||
|
expectedIngestionBlockedRPCs int
|
||||||
|
ingestionStates []ingestionStateTest
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "IngestTest",
|
||||||
|
expectedReceivedBatches: 2,
|
||||||
|
expectedIngestionBlockedRPCs: 1,
|
||||||
|
ingestionStates: []ingestionStateTest{
|
||||||
|
{
|
||||||
|
okToIngest: true,
|
||||||
|
expectedCode: codes.OK,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
okToIngest: false,
|
||||||
|
expectedCode: codes.OK,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
okToIngest: true,
|
||||||
|
expectedCode: codes.OK,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
addr := testutils.GetAvailableLocalAddress(t)
|
||||||
|
msg := &agenttracepb.ExportTraceServiceRequest{
|
||||||
|
Node: &commonpb.Node{
|
||||||
|
ServiceInfo: &commonpb.ServiceInfo{Name: "test-svc"},
|
||||||
|
},
|
||||||
|
Spans: []*tracepb.Span{
|
||||||
|
{
|
||||||
|
TraceId: []byte{
|
||||||
|
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
|
||||||
|
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
exportBidiFn := func(
|
||||||
|
t *testing.T,
|
||||||
|
cc *grpc.ClientConn,
|
||||||
|
msg *agenttracepb.ExportTraceServiceRequest) error {
|
||||||
|
|
||||||
|
acc := agenttracepb.NewTraceServiceClient(cc)
|
||||||
|
stream, err := acc.Export(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, stream)
|
||||||
|
|
||||||
|
err = stream.Send(msg)
|
||||||
|
stream.CloseSend()
|
||||||
|
if err == nil {
|
||||||
|
for {
|
||||||
|
if _, err = stream.Recv(); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
exporters := []struct {
|
||||||
|
receiverTag string
|
||||||
|
exportFn func(
|
||||||
|
t *testing.T,
|
||||||
|
cc *grpc.ClientConn,
|
||||||
|
msg *agenttracepb.ExportTraceServiceRequest) error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
receiverTag: "oc_trace",
|
||||||
|
exportFn: exportBidiFn,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, exporter := range exporters {
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name+"/"+exporter.receiverTag, func(t *testing.T) {
|
||||||
|
doneFn := observabilitytest.SetupRecordedMetricsTest()
|
||||||
|
defer doneFn()
|
||||||
|
|
||||||
|
sink := new(sinkTraceConsumer)
|
||||||
|
|
||||||
|
var opts []Option
|
||||||
|
ocr, err := New(addr, nil, nil, opts...)
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.NotNil(t, ocr)
|
||||||
|
|
||||||
|
ocr.traceConsumer = sink
|
||||||
|
err = ocr.Start(component.NewMockHost())
|
||||||
|
require.Nil(t, err)
|
||||||
|
defer ocr.Shutdown()
|
||||||
|
|
||||||
|
cc, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock())
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("grpc.Dial: %v", err)
|
||||||
|
}
|
||||||
|
defer cc.Close()
|
||||||
|
|
||||||
|
for _, ingestionState := range tt.ingestionStates {
|
||||||
|
if ingestionState.okToIngest {
|
||||||
|
sink.SetConsumeTraceError(nil)
|
||||||
|
} else {
|
||||||
|
sink.SetConsumeTraceError(fmt.Errorf("%q: consumer error", tt.name))
|
||||||
|
}
|
||||||
|
|
||||||
|
err = exporter.exportFn(t, cc, msg)
|
||||||
|
|
||||||
|
status, ok := status.FromError(err)
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.Equal(t, ingestionState.expectedCode, status.Code())
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, tt.expectedReceivedBatches, len(sink.AllTraces()))
|
||||||
|
require.Nil(
|
||||||
|
t,
|
||||||
|
observabilitytest.CheckValueViewReceiverReceivedSpans(
|
||||||
|
exporter.receiverTag,
|
||||||
|
tt.expectedReceivedBatches),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type sinkTraceConsumer struct {
|
||||||
|
// consumeTraceError is the error to be returned when ConsumeTraceData is
|
||||||
|
// called
|
||||||
|
consumeTraceError error
|
||||||
|
|
||||||
|
traces []consumerdata.TraceData
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ consumer.TraceConsumer = (*sinkTraceConsumer)(nil)
|
||||||
|
|
||||||
|
func (stc *sinkTraceConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
|
||||||
|
if stc.consumeTraceError == nil {
|
||||||
|
stc.traces = append(stc.traces, td)
|
||||||
|
}
|
||||||
|
return stc.consumeTraceError
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stc *sinkTraceConsumer) SetConsumeTraceError(err error) {
|
||||||
|
stc.consumeTraceError = err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stc *sinkTraceConsumer) AllTraces() []consumerdata.TraceData {
|
||||||
|
return stc.traces[:]
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue