receiver: use TraceData and MetricsData in receivers. (#224)

* receiver: use TraceData and MetricsData in receivers.

* Fix E2E test.
This commit is contained in:
Yang Song 2018-11-20 15:46:40 -08:00 committed by Bogdan Drutu
parent c6e57516b6
commit f35ec16487
13 changed files with 72 additions and 72 deletions

View File

@ -19,6 +19,7 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/receiver"
)
@ -53,15 +54,15 @@ func (tes traceExporters) ExportSpans(ctx context.Context, node *commonpb.Node,
return nil
}
// ReceiveSpans receives the span data in the protobuf format, translates it, and forwards the transformed
// ReceiveTraceData receives the span data in the protobuf format, translates it, and forwards the transformed
// span data to all trace exporters wrapped by the current one.
func (tes traceExporters) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*receiver.TraceReceiverAcknowledgement, error) {
func (tes traceExporters) ReceiveTraceData(ctx context.Context, td data.TraceData) (*receiver.TraceReceiverAcknowledgement, error) {
for _, te := range tes {
_ = te.ExportSpans(ctx, node, spans...)
_ = te.ExportSpans(ctx, td.Node, td.Spans...)
}
ack := &receiver.TraceReceiverAcknowledgement{
SavedSpans: uint64(len(spans)),
SavedSpans: uint64(len(td.Spans)),
}
return ack, nil
}

View File

@ -18,6 +18,7 @@ import (
"context"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/exporter"
)
@ -33,7 +34,7 @@ func NewTraceExporterProcessor(traceExporters ...exporter.TraceExporter) SpanPro
}
func (sp *exporterSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) {
ack, err := sp.tes.ReceiveSpans(context.Background(), batch.Node, batch.Spans...)
ack, err := sp.tes.ReceiveTraceData(context.Background(), data.TraceData{Node: batch.Node, Resource: batch.Resource, Spans: batch.Spans})
if err != nil {
return ack.DroppedSpans, err
}

View File

@ -17,9 +17,8 @@ package processor
import (
"context"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/receiver"
)
@ -38,10 +37,10 @@ func WrapWithSpanSink(format string, p SpanProcessor) receiver.TraceReceiverSink
}
}
func (ps *protoProcessorSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*receiver.TraceReceiverAcknowledgement, error) {
func (ps *protoProcessorSink) ReceiveTraceData(ctx context.Context, td data.TraceData) (*receiver.TraceReceiverAcknowledgement, error) {
batch := &agenttracepb.ExportTraceServiceRequest{
Node: node,
Spans: spans,
Node: td.Node,
Spans: td.Spans,
}
failures, err := ps.protoProcessor.ProcessSpans(batch, ps.sourceFormat)

View File

@ -23,8 +23,7 @@ import (
"contrib.go.opencensus.io/exporter/ocagent"
"go.opencensus.io/trace"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/receiver"
"github.com/census-instrumentation/opencensus-service/receiver/opencensus"
)
@ -95,9 +94,9 @@ type logSpanSink int
var _ receiver.TraceReceiverSink = (*logSpanSink)(nil)
func (lsr *logSpanSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*receiver.TraceReceiverAcknowledgement, error) {
spansBlob, _ := json.MarshalIndent(spans, " ", " ")
log.Printf("\n****\nNode: %#v\nSpans: %s\n****\n", node, spansBlob)
func (lsr *logSpanSink) ReceiveTraceData(ctx context.Context, td data.TraceData) (*receiver.TraceReceiverAcknowledgement, error) {
spansBlob, _ := json.MarshalIndent(td.Spans, " ", " ")
log.Printf("\n****\nNode: %#v\nSpans: %s\n****\n", td.Node, spansBlob)
return &receiver.TraceReceiverAcknowledgement{SavedSpans: uint64(len(spans))}, nil
return &receiver.TraceReceiverAcknowledgement{SavedSpans: uint64(len(td.Spans))}, nil
}

View File

@ -29,6 +29,7 @@ import (
tchannel "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/receiver"
"github.com/census-instrumentation/opencensus-service/translator/trace"
)
@ -177,7 +178,7 @@ func (jr *jReceiver) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch)
if err == nil && octrace != nil {
ok = true
jr.spanSink.ReceiveSpans(ctx, octrace.Node, octrace.Spans...)
jr.spanSink.ReceiveTraceData(ctx, data.TraceData{Node: octrace.Node, Spans: octrace.Spans})
}
jbsr = append(jbsr, &jaeger.BatchSubmitResponse{

View File

@ -30,6 +30,7 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/receiver"
jaegerreceiver "github.com/census-instrumentation/opencensus-service/receiver/jaeger"
@ -207,17 +208,17 @@ type concurrentSpanSink struct {
var _ receiver.TraceReceiverSink = (*concurrentSpanSink)(nil)
func (css *concurrentSpanSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*receiver.TraceReceiverAcknowledgement, error) {
func (css *concurrentSpanSink) ReceiveTraceData(ctx context.Context, td data.TraceData) (*receiver.TraceReceiverAcknowledgement, error) {
css.mu.Lock()
defer css.mu.Unlock()
css.traces = append(css.traces, &agenttracepb.ExportTraceServiceRequest{
Node: node,
Spans: spans,
Node: td.Node,
Spans: td.Spans,
})
ack := &receiver.TraceReceiverAcknowledgement{
SavedSpans: uint64(len(spans)),
SavedSpans: uint64(len(td.Spans)),
}
return ack, nil

View File

@ -27,6 +27,7 @@ import (
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/receiver"
)
@ -52,12 +53,6 @@ func New(sr receiver.MetricsReceiverSink, opts ...Option) (*Receiver, error) {
var _ agentmetricspb.MetricsServiceServer = (*Receiver)(nil)
type bundledMetrics struct {
metrics []*metricspb.Metric
node *commonpb.Node
resource *resourcepb.Resource
}
var errMetricsExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node")
const receiverName = "opencensus_metrics"
@ -68,7 +63,7 @@ func (ocr *Receiver) Export(mes agentmetricspb.MetricsService_ExportServer) erro
// The bundler will receive batches of metrics i.e. []*metricspb.Metric
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := internal.ContextWithReceiverName(mes.Context(), receiverName)
metricsBundler := bundler.NewBundler((*bundledMetrics)(nil), func(payload interface{}) {
metricsBundler := bundler.NewBundler((*data.MetricsData)(nil), func(payload interface{}) {
ocr.batchMetricExporting(ctxWithReceiverName, payload)
})
@ -123,14 +118,14 @@ func (ocr *Receiver) Export(mes agentmetricspb.MetricsService_ExportServer) erro
func processReceivedMetrics(ni *commonpb.Node, resource *resourcepb.Resource, metrics []*metricspb.Metric, bundler *bundler.Bundler) {
// Firstly, we'll add them to the bundler.
if len(metrics) > 0 {
bundlerPayload := &bundledMetrics{node: ni, metrics: metrics, resource: resource}
bundler.Add(bundlerPayload, len(bundlerPayload.metrics))
bundlerPayload := &data.MetricsData{Node: ni, Metrics: metrics, Resource: resource}
bundler.Add(bundlerPayload, len(bundlerPayload.Metrics))
}
}
func (ocr *Receiver) batchMetricExporting(longLivedRPCCtx context.Context, payload interface{}) {
bms := payload.([]*bundledMetrics)
if len(bms) == 0 {
mds := payload.([]*data.MetricsData)
if len(mds) == 0 {
return
}
@ -146,9 +141,9 @@ func (ocr *Receiver) batchMetricExporting(longLivedRPCCtx context.Context, paylo
internal.SetParentLink(longLivedRPCCtx, span)
nMetrics := int64(0)
for _, bm := range bms {
ocr.metricSink.ReceiveMetrics(ctx, bm.node, bm.resource, bm.metrics...)
nMetrics += int64(len(bm.metrics))
for _, md := range mds {
ocr.metricSink.ReceiveMetricsData(ctx, *md)
nMetrics += int64(len(md.Metrics))
}
span.Annotate([]trace.Attribute{

View File

@ -34,7 +34,7 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/receiver"
"github.com/census-instrumentation/opencensus-service/receiver/opencensus/ocmetrics"
@ -327,13 +327,13 @@ func newMetricAppender() *metricAppender {
var _ receiver.MetricsReceiverSink = (*metricAppender)(nil)
func (sa *metricAppender) ReceiveMetrics(ctx context.Context, node *commonpb.Node, resource *resourcepb.Resource, metrics ...*metricspb.Metric) (*receiver.MetricsReceiverAcknowledgement, error) {
func (sa *metricAppender) ReceiveMetricsData(ctx context.Context, md data.MetricsData) (*receiver.MetricsReceiverAcknowledgement, error) {
sa.Lock()
defer sa.Unlock()
sa.metricsPerNode[node] = append(sa.metricsPerNode[node], metrics...)
sa.metricsPerNode[md.Node] = append(sa.metricsPerNode[md.Node], md.Metrics...)
return &receiver.MetricsReceiverAcknowledgement{SavedMetrics: uint64(len(metrics))}, nil
return &receiver.MetricsReceiverAcknowledgement{SavedMetrics: uint64(len(md.Metrics))}, nil
}
func ocReceiverOnGRPCServer(t *testing.T, sr receiver.MetricsReceiverSink, opts ...ocmetrics.Option) (oci *ocmetrics.Receiver, port int, done func()) {

View File

@ -25,7 +25,9 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/receiver"
)
@ -59,11 +61,6 @@ func (oci *Receiver) Config(tcs agenttracepb.TraceService_ConfigServer) error {
return errUnimplemented
}
type spansAndNode struct {
spans []*tracepb.Span
node *commonpb.Node
}
var errTraceExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node")
const receiverName = "opencensus_trace"
@ -74,7 +71,7 @@ func (oci *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
// The bundler will receive batches of spans i.e. []*tracepb.Span
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := internal.ContextWithReceiverName(tes.Context(), receiverName)
traceBundler := bundler.NewBundler((*spansAndNode)(nil), func(payload interface{}) {
traceBundler := bundler.NewBundler((*data.TraceData)(nil), func(payload interface{}) {
oci.batchSpanExporting(ctxWithReceiverName, payload)
})
@ -104,11 +101,11 @@ func (oci *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(tes.Context(), receiverName)
processReceivedSpans := func(ni *commonpb.Node, spans []*tracepb.Span) {
processReceivedSpans := func(ni *commonpb.Node, resource *resourcepb.Resource, spans []*tracepb.Span) {
// Firstly, we'll add them to the bundler.
if len(spans) > 0 {
bundlerPayload := &spansAndNode{node: ni, spans: spans}
traceBundler.Add(bundlerPayload, len(bundlerPayload.spans))
bundlerPayload := &data.TraceData{Node: ni, Resource: resource, Spans: spans}
traceBundler.Add(bundlerPayload, len(bundlerPayload.Spans))
}
// We MUST unconditionally record metrics from this reception.
@ -116,6 +113,7 @@ func (oci *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
}
var lastNonNilNode *commonpb.Node
var resource *resourcepb.Resource
// Now that we've got the first message with a Node, we can start to receive streamed up spans.
for {
// If a Node has been sent from downstream, save and use it.
@ -123,7 +121,13 @@ func (oci *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
lastNonNilNode = recv.Node
}
processReceivedSpans(lastNonNilNode, recv.Spans)
// TODO(songya): differentiate between unset and nil resource. See
// https://github.com/census-instrumentation/opencensus-proto/issues/146.
if recv.Resource != nil {
resource = recv.Resource
}
processReceivedSpans(lastNonNilNode, resource, recv.Spans)
recv, err = tes.Recv()
if err != nil {
@ -133,8 +137,8 @@ func (oci *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
}
func (oci *Receiver) batchSpanExporting(longLivedRPCCtx context.Context, payload interface{}) {
spnL := payload.([]*spansAndNode)
if len(spnL) == 0 {
tracedata := payload.([]*data.TraceData)
if len(tracedata) == 0 {
return
}
@ -150,9 +154,9 @@ func (oci *Receiver) batchSpanExporting(longLivedRPCCtx context.Context, payload
internal.SetParentLink(longLivedRPCCtx, span)
nSpans := int64(0)
for _, spn := range spnL {
oci.spanSink.ReceiveSpans(ctx, spn.node, spn.spans...)
nSpans += int64(len(spn.spans))
for _, td := range tracedata {
oci.spanSink.ReceiveTraceData(ctx, *td)
nSpans += int64(len(td.Spans))
}
span.Annotate([]trace.Attribute{

View File

@ -36,6 +36,7 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/receiver"
"github.com/census-instrumentation/opencensus-service/receiver/opencensus/octrace"
@ -447,13 +448,13 @@ func newSpanAppender() *spanAppender {
var _ receiver.TraceReceiverSink = (*spanAppender)(nil)
func (sa *spanAppender) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*receiver.TraceReceiverAcknowledgement, error) {
func (sa *spanAppender) ReceiveTraceData(ctx context.Context, td data.TraceData) (*receiver.TraceReceiverAcknowledgement, error) {
sa.Lock()
defer sa.Unlock()
sa.spansPerNode[node] = append(sa.spansPerNode[node], spans...)
sa.spansPerNode[td.Node] = append(sa.spansPerNode[td.Node], td.Spans...)
return &receiver.TraceReceiverAcknowledgement{SavedSpans: uint64(len(spans))}, nil
return &receiver.TraceReceiverAcknowledgement{SavedSpans: uint64(len(td.Spans))}, nil
}
func ocReceiverOnGRPCServer(t *testing.T, sr receiver.TraceReceiverSink, opts ...octrace.Option) (oci *octrace.Receiver, port int, done func()) {

View File

@ -17,12 +17,8 @@ package receiver
import (
"context"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
metricpb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
)
// A TraceReceiver is an "arbitrary data"-to-"trace proto span" converter.
@ -42,13 +38,13 @@ type TraceReceiver interface {
StopTraceReception(ctx context.Context) error
}
// TraceReceiverSink is an interface that receives spans from a Node identifier.
// TraceReceiverSink is an interface that receives TraceData.
type TraceReceiverSink interface {
ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*TraceReceiverAcknowledgement, error)
ReceiveTraceData(ctx context.Context, tracedata data.TraceData) (*TraceReceiverAcknowledgement, error)
}
// TraceReceiverAcknowledgement struct reports the number of saved and dropped spans in a
// ReceiveSpans call.
// ReceiveTraceData call.
type TraceReceiverAcknowledgement struct {
SavedSpans uint64
DroppedSpans uint64
@ -66,13 +62,13 @@ type MetricsReceiver interface {
StopMetricsReception(ctx context.Context) error
}
// MetricsReceiverSink is an interface that receives metrics from a Node identifier.
// MetricsReceiverSink is an interface that receives MetricsData.
type MetricsReceiverSink interface {
ReceiveMetrics(ctx context.Context, node *commonpb.Node, resource *resourcepb.Resource, metrics ...*metricpb.Metric) (*MetricsReceiverAcknowledgement, error)
ReceiveMetricsData(ctx context.Context, metricsdata data.MetricsData) (*MetricsReceiverAcknowledgement, error)
}
// MetricsReceiverAcknowledgement struct reports the number of saved and dropped spans in a
// ReceiveSpans call.
// MetricsReceiverAcknowledgement struct reports the number of saved and dropped metrics in a
// ReceiveMetricsData call.
type MetricsReceiverAcknowledgement struct {
SavedMetrics uint64
DroppedMetrics uint64

View File

@ -38,6 +38,7 @@ import (
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/receiver"
)
@ -247,9 +248,9 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(ctx, "zipkin")
// Now translate them into tracepb.Span
// Now translate them into TraceData
for _, ereq := range ereqs {
zr.spanSink.ReceiveSpans(ctx, ereq.Node, ereq.Spans...)
zr.spanSink.ReceiveTraceData(ctx, data.TraceData{Node: ereq.Node, Spans: ereq.Spans})
// We MUST unconditionally record metrics from this reception.
spansMetricsFn(ereq.Node, ereq.Spans)
}

View File

@ -33,6 +33,7 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/internal/testutils"
"github.com/census-instrumentation/opencensus-service/receiver"
@ -395,6 +396,6 @@ type noopSink int
var _ receiver.TraceReceiverSink = (*noopSink)(nil)
func (ns *noopSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*receiver.TraceReceiverAcknowledgement, error) {
func (ns *noopSink) ReceiveTraceData(ctx context.Context, td data.TraceData) (*receiver.TraceReceiverAcknowledgement, error) {
return nil, nil
}