Rename [Trace|Metrics]DataProcessor to [Trace|Metrics]Consumer, add TraceProcessor similar to receiver,exporter. (#473)

* Rename [Trace|Metrics]DataProcessor to [Trace|Metrics]Processor

* Rename TraceProcessor to TraceConsumer as discussed.

* Add empty_test for processor.
This commit is contained in:
Bogdan Drutu 2019-03-05 17:36:22 -08:00 committed by GitHub
parent 8f26866f24
commit 62b0ae5be7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 420 additions and 335 deletions

View File

@ -33,12 +33,13 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/internal/config" "github.com/census-instrumentation/opencensus-service/internal/config"
"github.com/census-instrumentation/opencensus-service/internal/config/viperutils" "github.com/census-instrumentation/opencensus-service/internal/config/viperutils"
"github.com/census-instrumentation/opencensus-service/internal/pprofserver" "github.com/census-instrumentation/opencensus-service/internal/pprofserver"
"github.com/census-instrumentation/opencensus-service/internal/version" "github.com/census-instrumentation/opencensus-service/internal/version"
"github.com/census-instrumentation/opencensus-service/observability" "github.com/census-instrumentation/opencensus-service/observability"
"github.com/census-instrumentation/opencensus-service/processor" "github.com/census-instrumentation/opencensus-service/processor/multiconsumer"
"github.com/census-instrumentation/opencensus-service/receiver/jaegerreceiver" "github.com/census-instrumentation/opencensus-service/receiver/jaegerreceiver"
"github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver" "github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver"
"github.com/census-instrumentation/opencensus-service/receiver/prometheusreceiver" "github.com/census-instrumentation/opencensus-service/receiver/prometheusreceiver"
@ -117,8 +118,8 @@ func runOCAgent() {
log.Fatalf("Config: failed to create exporters from YAML: %v", err) log.Fatalf("Config: failed to create exporters from YAML: %v", err)
} }
commonSpanSink := processor.NewMultiTraceDataProcessor(traceExporters) commonSpanSink := multiconsumer.NewTraceProcessor(traceExporters)
commonMetricsSink := processor.NewMultiMetricsDataProcessor(metricsExporters) commonMetricsSink := multiconsumer.NewMetricsProcessor(metricsExporters)
// Add other receivers here as they are implemented // Add other receivers here as they are implemented
ocReceiverDoneFn, err := runOCReceiver(logger, &agentConfig, commonSpanSink, commonMetricsSink) ocReceiverDoneFn, err := runOCReceiver(logger, &agentConfig, commonSpanSink, commonMetricsSink)
@ -213,7 +214,7 @@ func runZPages(port int) func() error {
return srv.Close return srv.Close
} }
func runOCReceiver(logger *zap.Logger, acfg *config.Config, tdp processor.TraceDataProcessor, mdp processor.MetricsDataProcessor) (doneFn func() error, err error) { func runOCReceiver(logger *zap.Logger, acfg *config.Config, tdp consumer.TraceConsumer, mdp consumer.MetricsConsumer) (doneFn func() error, err error) {
tlsCredsOption, hasTLSCreds, err := acfg.OpenCensusReceiverTLSCredentialsServerOption() tlsCredsOption, hasTLSCreds, err := acfg.OpenCensusReceiverTLSCredentialsServerOption()
if err != nil { if err != nil {
return nil, fmt.Errorf("OpenCensus receiver TLS Credentials: %v", err) return nil, fmt.Errorf("OpenCensus receiver TLS Credentials: %v", err)
@ -270,7 +271,7 @@ func runOCReceiver(logger *zap.Logger, acfg *config.Config, tdp processor.TraceD
return doneFn, nil return doneFn, nil
} }
func runJaegerReceiver(collectorThriftPort, collectorHTTPPort int, next processor.TraceDataProcessor) (doneFn func() error, err error) { func runJaegerReceiver(collectorThriftPort, collectorHTTPPort int, next consumer.TraceConsumer) (doneFn func() error, err error) {
jtr, err := jaegerreceiver.New(context.Background(), &jaegerreceiver.Configuration{ jtr, err := jaegerreceiver.New(context.Background(), &jaegerreceiver.Configuration{
CollectorThriftPort: collectorThriftPort, CollectorThriftPort: collectorThriftPort,
CollectorHTTPPort: collectorHTTPPort, CollectorHTTPPort: collectorHTTPPort,
@ -292,7 +293,7 @@ func runJaegerReceiver(collectorThriftPort, collectorHTTPPort int, next processo
return doneFn, nil return doneFn, nil
} }
func runZipkinReceiver(addr string, next processor.TraceDataProcessor) (doneFn func() error, err error) { func runZipkinReceiver(addr string, next consumer.TraceConsumer) (doneFn func() error, err error) {
zi, err := zipkinreceiver.New(addr) zi, err := zipkinreceiver.New(addr)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create the Zipkin receiver: %v", err) return nil, fmt.Errorf("failed to create the Zipkin receiver: %v", err)
@ -308,7 +309,7 @@ func runZipkinReceiver(addr string, next processor.TraceDataProcessor) (doneFn f
return doneFn, nil return doneFn, nil
} }
func runZipkinScribeReceiver(config *config.ScribeReceiverConfig, next processor.TraceDataProcessor) (doneFn func() error, err error) { func runZipkinScribeReceiver(config *config.ScribeReceiverConfig, next consumer.TraceConsumer) (doneFn func() error, err error) {
zs, err := scribe.NewReceiver(config.Address, config.Port, config.Category) zs, err := scribe.NewReceiver(config.Address, config.Port, config.Category)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create the Zipkin Scribe receiver: %v", err) return nil, fmt.Errorf("failed to create the Zipkin Scribe receiver: %v", err)
@ -324,7 +325,7 @@ func runZipkinScribeReceiver(config *config.ScribeReceiverConfig, next processor
return doneFn, nil return doneFn, nil
} }
func runPrometheusReceiver(v *viper.Viper, next processor.MetricsDataProcessor) (doneFn func() error, err error) { func runPrometheusReceiver(v *viper.Viper, next consumer.MetricsConsumer) (doneFn func() error, err error) {
pmr, err := prometheusreceiver.New(v.Sub("receivers.prometheus")) pmr, err := prometheusreceiver.New(v.Sub("receivers.prometheus"))
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -26,6 +26,7 @@ import (
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/sender" "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/sender"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/exporter/loggingexporter" "github.com/census-instrumentation/opencensus-service/exporter/loggingexporter"
"github.com/census-instrumentation/opencensus-service/internal/collector/processor" "github.com/census-instrumentation/opencensus-service/internal/collector/processor"
"github.com/census-instrumentation/opencensus-service/internal/collector/processor/nodebatcher" "github.com/census-instrumentation/opencensus-service/internal/collector/processor/nodebatcher"
@ -33,10 +34,9 @@ import (
"github.com/census-instrumentation/opencensus-service/internal/collector/processor/tailsampling" "github.com/census-instrumentation/opencensus-service/internal/collector/processor/tailsampling"
"github.com/census-instrumentation/opencensus-service/internal/collector/sampling" "github.com/census-instrumentation/opencensus-service/internal/collector/sampling"
"github.com/census-instrumentation/opencensus-service/internal/config" "github.com/census-instrumentation/opencensus-service/internal/config"
mainprocessor "github.com/census-instrumentation/opencensus-service/processor"
) )
func createExporters(v *viper.Viper, logger *zap.Logger) ([]func(), []mainprocessor.TraceDataProcessor, []mainprocessor.MetricsDataProcessor) { func createExporters(v *viper.Viper, logger *zap.Logger) ([]func(), []consumer.TraceConsumer, []consumer.MetricsConsumer) {
// TODO: (@pjanotti) this is slightly modified from agent but in the end duplication, need to consolidate style and visibility. // TODO: (@pjanotti) this is slightly modified from agent but in the end duplication, need to consolidate style and visibility.
traceExporters, metricsExporters, doneFns, err := config.ExportersFromViperConfig(logger, v) traceExporters, metricsExporters, doneFns, err := config.ExportersFromViperConfig(logger, v)
if err != nil { if err != nil {

37
consumer/consumer.go Normal file
View File

@ -0,0 +1,37 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumer
import (
"context"
"github.com/census-instrumentation/opencensus-service/data"
)
// MetricsConsumer is an interface that receives data.MetricsData, process it as needed, and
// sends it to the next processing node if any or to the destination.
//
// ConsumeMetricsData receives data.MetricsData for processing by the MetricsConsumer.
type MetricsConsumer interface {
ConsumeMetricsData(ctx context.Context, md data.MetricsData) error
}
// TraceConsumer is an interface that receives data.TraceData, process it as needed, and
// sends it to the next processing node if any or to the destination.
//
// ConsumeTraceData receives data.TraceData for processing by the TraceConsumer.
type TraceConsumer interface {
ConsumeTraceData(ctx context.Context, td data.TraceData) error
}

17
consumer/empty_test.go Normal file
View File

@ -0,0 +1,17 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumer
// Package with interface only.

View File

@ -26,9 +26,9 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper" "github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper"
"github.com/census-instrumentation/opencensus-service/processor"
) )
const defaultVersionForAWSXRayApplications = "latest" const defaultVersionForAWSXRayApplications = "latest"
@ -57,11 +57,11 @@ type awsXRayExporter struct {
defaultOptions []xray.Option defaultOptions []xray.Option
} }
var _ processor.TraceDataProcessor = (*awsXRayExporter)(nil) var _ consumer.TraceConsumer = (*awsXRayExporter)(nil)
// AWSXRayTraceExportersFromViper unmarshals the viper and returns an processor.TraceDataProcessor targeting // AWSXRayTraceExportersFromViper unmarshals the viper and returns an consumer.TraceConsumer targeting
// AWS X-Ray according to the configuration settings. // AWS X-Ray according to the configuration settings.
func AWSXRayTraceExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataProcessor, mdps []processor.MetricsDataProcessor, doneFns []func() error, err error) { func AWSXRayTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps []consumer.MetricsConsumer, doneFns []func() error, err error) {
var cfg struct { var cfg struct {
AWSXRay *awsXRayConfig `mapstructure:"aws-xray"` AWSXRay *awsXRayConfig `mapstructure:"aws-xray"`
} }
@ -84,7 +84,7 @@ func AWSXRayTraceExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataP
defaultServiceName: xc.DefaultServiceName, defaultServiceName: xc.DefaultServiceName,
} }
tdps = append(tdps, axe) tps = append(tps, axe)
doneFns = append(doneFns, func() error { doneFns = append(doneFns, func() error {
axe.Flush() axe.Flush()
return nil return nil
@ -146,7 +146,7 @@ func transformConfigToXRayOptions(axrCfg *awsXRayConfig) (xopts []xray.Option, e
// ExportSpans is the method that translates OpenCensus-Proto Traces into AWS X-Ray spans. // ExportSpans is the method that translates OpenCensus-Proto Traces into AWS X-Ray spans.
// It uniquely maintains // It uniquely maintains
func (axe *awsXRayExporter) ProcessTraceData(ctx context.Context, td data.TraceData) (xerr error) { func (axe *awsXRayExporter) ConsumeTraceData(ctx context.Context, td data.TraceData) (xerr error) {
ctx, span := trace.StartSpan(ctx, ctx, span := trace.StartSpan(ctx,
"opencensus.service.exporter.aws_xray.ExportSpans", "opencensus.service.exporter.aws_xray.ExportSpans",
trace.WithSampler(trace.NeverSample())) trace.WithSampler(trace.NeverSample()))

View File

@ -18,8 +18,8 @@ import (
datadog "github.com/DataDog/opencensus-go-exporter-datadog" datadog "github.com/DataDog/opencensus-go-exporter-datadog"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper" "github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper"
"github.com/census-instrumentation/opencensus-service/processor"
) )
type datadogConfig struct { type datadogConfig struct {
@ -43,7 +43,7 @@ type datadogConfig struct {
// DatadogTraceExportersFromViper unmarshals the viper and returns an exporter.TraceExporter targeting // DatadogTraceExportersFromViper unmarshals the viper and returns an exporter.TraceExporter targeting
// Datadog according to the configuration settings. // Datadog according to the configuration settings.
func DatadogTraceExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataProcessor, mdps []processor.MetricsDataProcessor, doneFns []func() error, err error) { func DatadogTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps []consumer.MetricsConsumer, doneFns []func() error, err error) {
var cfg struct { var cfg struct {
Datadog *datadogConfig `mapstructure:"datadog,omitempty"` Datadog *datadogConfig `mapstructure:"datadog,omitempty"`
} }
@ -74,7 +74,7 @@ func DatadogTraceExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataP
// TODO: Examine the Datadog exporter to see // TODO: Examine the Datadog exporter to see
// if trace.ExportSpan was constraining and if perhaps the // if trace.ExportSpan was constraining and if perhaps the
// upload can use the context and information from the Node. // upload can use the context and information from the Node.
tdps = append(tdps, exporterwrapper.NewExporterWrapper("datadog", de)) tps = append(tps, exporterwrapper.NewExporterWrapper("datadog", de))
// TODO: (@odeke-em, @songya23) implement ExportMetrics for Datadog. // TODO: (@odeke-em, @songya23) implement ExportMetrics for Datadog.
// mes = append(mes, oexp) // mes = append(mes, oexp)

39
exporter/exporter.go Normal file
View File

@ -0,0 +1,39 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package exporter
import (
"github.com/census-instrumentation/opencensus-service/consumer"
)
// TraceExporter composes TraceConsumer with some additional exporter-specific functions.
type TraceExporter interface {
consumer.TraceConsumer
// TraceExportFormat gets the name of the format in which this exporter sends its data.
// For exporters that can export multiple signals it is recommended to encode the signal
// as suffix (e.g. "oc_trace").
TraceExportFormat() string
}
// MetricsExporter composes MetricsConsumer with some additional exporter-specific functions.
type MetricsExporter interface {
consumer.MetricsConsumer
// MetricsExportFormat gets the name of the format in which this exporter sends its data.
// For exporters that can export multiple signals it is recommended to encode the signal
// as suffix (e.g. "oc_metrics").
MetricsExportFormat() string
}

View File

@ -26,11 +26,11 @@ type nopExporter int
var _ exporter.TraceExporter = (*nopExporter)(nil) var _ exporter.TraceExporter = (*nopExporter)(nil)
var _ exporter.MetricsExporter = (*nopExporter)(nil) var _ exporter.MetricsExporter = (*nopExporter)(nil)
func (ne *nopExporter) ProcessTraceData(ctx context.Context, td data.TraceData) error { func (ne *nopExporter) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
return nil return nil
} }
func (ne *nopExporter) ProcessMetricsData(ctx context.Context, md data.MetricsData) error { func (ne *nopExporter) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error {
return nil return nil
} }

View File

@ -27,7 +27,7 @@ func TestNopTraceExporterNoErrors(t *testing.T) {
td := data.TraceData{ td := data.TraceData{
Spans: make([]*tracepb.Span, 7), Spans: make([]*tracepb.Span, 7),
} }
if err := nte.ProcessTraceData(context.Background(), td); err != nil { if err := nte.ConsumeTraceData(context.Background(), td); err != nil {
t.Errorf("Wanted nil got error") t.Errorf("Wanted nil got error")
return return
} }
@ -42,7 +42,7 @@ func TestNoopMetricsExporterNoErrors(t *testing.T) {
md := data.MetricsData{ md := data.MetricsData{
Metrics: make([]*metricspb.Metric, 7), Metrics: make([]*metricspb.Metric, 7),
} }
if err := nme.ProcessMetricsData(context.Background(), md); err != nil { if err := nme.ConsumeMetricsData(context.Background(), md); err != nil {
t.Errorf("Wanted nil got error") t.Errorf("Wanted nil got error")
return return
} }

View File

@ -30,8 +30,8 @@ type SinkTraceExporter struct {
var _ exporter.TraceExporter = (*SinkTraceExporter)(nil) var _ exporter.TraceExporter = (*SinkTraceExporter)(nil)
// ProcessTraceData stores traces for tests. // ConsumeTraceData stores traces for tests.
func (ste *SinkTraceExporter) ProcessTraceData(ctx context.Context, td data.TraceData) error { func (ste *SinkTraceExporter) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
ste.mu.Lock() ste.mu.Lock()
defer ste.mu.Unlock() defer ste.mu.Unlock()
@ -66,8 +66,8 @@ type SinkMetricsExporter struct {
var _ exporter.MetricsExporter = (*SinkMetricsExporter)(nil) var _ exporter.MetricsExporter = (*SinkMetricsExporter)(nil)
// ProcessMetricsData stores traces for tests. // ConsumeMetricsData stores traces for tests.
func (sme *SinkMetricsExporter) ProcessMetricsData(ctx context.Context, md data.MetricsData) error { func (sme *SinkMetricsExporter) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error {
sme.mu.Lock() sme.mu.Lock()
defer sme.mu.Unlock() defer sme.mu.Unlock()

View File

@ -30,7 +30,7 @@ func TestSinkTraceExporter(t *testing.T) {
} }
want := make([]data.TraceData, 0, 7) want := make([]data.TraceData, 0, 7)
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
if err := sink.ProcessTraceData(context.Background(), td); err != nil { if err := sink.ConsumeTraceData(context.Background(), td); err != nil {
t.Errorf("Wanted nil got error") t.Errorf("Wanted nil got error")
return return
} }
@ -53,7 +53,7 @@ func TestSinkMetricsExporter(t *testing.T) {
} }
want := make([]data.MetricsData, 0, 7) want := make([]data.MetricsData, 0, 7)
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
if err := sink.ProcessMetricsData(context.Background(), md); err != nil { if err := sink.ConsumeMetricsData(context.Background(), md); err != nil {
t.Errorf("Wanted nil got error") t.Errorf("Wanted nil got error")
return return
} }

View File

@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
// Package exporterwrapper provides support for wrapping OC go library trace.Exporter into a // Package exporterwrapper provides support for wrapping OC go library trace.Exporter into a
// processor.TraceDataProcessor. // consumer.TraceConsumer.
// For now it currently only provides statically imported OpenCensus // For now it currently only provides statically imported OpenCensus
// exporters like: // exporters like:
// * Stackdriver Tracing and Monitoring // * Stackdriver Tracing and Monitoring
@ -27,13 +27,13 @@ import (
"go.opencensus.io/trace" "go.opencensus.io/trace"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/processor"
spandatatranslator "github.com/census-instrumentation/opencensus-service/translator/trace/spandata" spandatatranslator "github.com/census-instrumentation/opencensus-service/translator/trace/spandata"
) )
// NewExporterWrapper returns a processor.TraceDataProcessor that converts OpenCensus Proto TraceData // NewExporterWrapper returns a consumer.TraceConsumer that converts OpenCensus Proto TraceData
// to OpenCensus-Go SpanData and calls into the given trace.Exporter. // to OpenCensus-Go SpanData and calls into the given trace.Exporter.
// //
// This is a bootstrapping mechanism for us to re-use as many of // This is a bootstrapping mechanism for us to re-use as many of
@ -41,7 +41,7 @@ import (
// by various vendors and contributors. Eventually the goal is to // by various vendors and contributors. Eventually the goal is to
// get those exporters converted to directly receive // get those exporters converted to directly receive
// OpenCensus Proto TraceData. // OpenCensus Proto TraceData.
func NewExporterWrapper(exporterName string, ocExporter trace.Exporter) processor.TraceDataProcessor { func NewExporterWrapper(exporterName string, ocExporter trace.Exporter) consumer.TraceConsumer {
return &ocExporterWrapper{spanName: "opencensus.service.exporter." + exporterName + ".ExportTrace", ocExporter: ocExporter} return &ocExporterWrapper{spanName: "opencensus.service.exporter." + exporterName + ".ExportTrace", ocExporter: ocExporter}
} }
@ -50,9 +50,9 @@ type ocExporterWrapper struct {
ocExporter trace.Exporter ocExporter trace.Exporter
} }
var _ processor.TraceDataProcessor = (*ocExporterWrapper)(nil) var _ consumer.TraceConsumer = (*ocExporterWrapper)(nil)
func (octew *ocExporterWrapper) ProcessTraceData(ctx context.Context, td data.TraceData) (aerr error) { func (octew *ocExporterWrapper) ConsumeTraceData(ctx context.Context, td data.TraceData) (aerr error) {
ctx, span := trace.StartSpan(ctx, ctx, span := trace.StartSpan(ctx,
octew.spanName, trace.WithSampler(trace.NeverSample())) octew.spanName, trace.WithSampler(trace.NeverSample()))

View File

@ -16,23 +16,8 @@ package exporter
import ( import (
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/census-instrumentation/opencensus-service/processor"
) )
// TraceExporter composes TraceDataProcessor with some additional
// exporter-specific functions. This helps the service core to identify which
// TraceDataProcessors are Exporters and which are internal processing
// components, so that better validation of pipelines can be done.
type TraceExporter interface {
processor.TraceDataProcessor
// TraceExportFormat gets the name of the format in which this exporter sends its data.
// For exporters that can export multiple signals it is recommended to encode the signal
// as suffix (e.g. "oc_trace").
TraceExportFormat() string
}
// TraceExporterFactory is an interface that builds a new TraceExporter based on // TraceExporterFactory is an interface that builds a new TraceExporter based on
// some viper.Viper configuration. // some viper.Viper configuration.
type TraceExporterFactory interface { type TraceExporterFactory interface {
@ -45,19 +30,6 @@ type TraceExporterFactory interface {
DefaultConfig() *viper.Viper DefaultConfig() *viper.Viper
} }
// MetricsExporter composes MetricsDataProcessor with some additional
// exporter-specific functions. This helps the service core to identify which
// MetricsDataProcessors are Exporters and which are internal processing
// components, so that better validation of pipelines can be done.
type MetricsExporter interface {
processor.MetricsDataProcessor
// MetricsExportFormat gets the name of the format in which this exporter sends its data.
// For exporters that can export multiple signals it is recommended to encode the signal
// as suffix (e.g. "oc_metrics").
MetricsExportFormat() string
}
// MetricsExporterFactory is an interface that builds a new MetricsExporter based on // MetricsExporterFactory is an interface that builds a new MetricsExporter based on
// some viper.Viper configuration. // some viper.Viper configuration.
type MetricsExporterFactory interface { type MetricsExporterFactory interface {

View File

@ -21,8 +21,8 @@ import (
"github.com/honeycombio/opencensus-exporter/honeycomb" "github.com/honeycombio/opencensus-exporter/honeycomb"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper" "github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper"
"github.com/census-instrumentation/opencensus-service/processor"
) )
type honeycombConfig struct { type honeycombConfig struct {
@ -32,7 +32,7 @@ type honeycombConfig struct {
// HoneycombTraceExportersFromViper unmarshals the viper and returns an exporter.TraceExporter // HoneycombTraceExportersFromViper unmarshals the viper and returns an exporter.TraceExporter
// targeting Honeycomb according to the configuration settings. // targeting Honeycomb according to the configuration settings.
func HoneycombTraceExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataProcessor, mdps []processor.MetricsDataProcessor, doneFns []func() error, err error) { func HoneycombTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps []consumer.MetricsConsumer, doneFns []func() error, err error) {
var cfg struct { var cfg struct {
Honeycomb *honeycombConfig `mapstructure:"honeycomb"` Honeycomb *honeycombConfig `mapstructure:"honeycomb"`
} }
@ -47,7 +47,7 @@ func HoneycombTraceExportersFromViper(v *viper.Viper) (tdps []processor.TraceDat
rawExp := honeycomb.NewExporter(hc.WriteKey, hc.DatasetName) rawExp := honeycomb.NewExporter(hc.WriteKey, hc.DatasetName)
tdps = append(tdps, exporterwrapper.NewExporterWrapper("honeycomb", rawExp)) tps = append(tps, exporterwrapper.NewExporterWrapper("honeycomb", rawExp))
doneFns = append(doneFns, func() error { doneFns = append(doneFns, func() error {
rawExp.Close() rawExp.Close()
return nil return nil

View File

@ -18,8 +18,8 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
"go.opencensus.io/exporter/jaeger" "go.opencensus.io/exporter/jaeger"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper" "github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper"
"github.com/census-instrumentation/opencensus-service/processor"
) )
// Slight modified version of go/src/go.opencensus.io/exporter/jaeger/jaeger.go // Slight modified version of go/src/go.opencensus.io/exporter/jaeger/jaeger.go
@ -32,7 +32,7 @@ type jaegerConfig struct {
// JaegerExportersFromViper unmarshals the viper and returns exporter.TraceExporters targeting // JaegerExportersFromViper unmarshals the viper and returns exporter.TraceExporters targeting
// Jaeger according to the configuration settings. // Jaeger according to the configuration settings.
func JaegerExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataProcessor, mdps []processor.MetricsDataProcessor, doneFns []func() error, err error) { func JaegerExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps []consumer.MetricsConsumer, doneFns []func() error, err error) {
var cfg struct { var cfg struct {
Jaeger *jaegerConfig `mapstructure:"jaeger"` Jaeger *jaegerConfig `mapstructure:"jaeger"`
} }
@ -64,6 +64,6 @@ func JaegerExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataProcess
// TODO: Examine "contrib.go.opencensus.io/exporter/jaeger" to see // TODO: Examine "contrib.go.opencensus.io/exporter/jaeger" to see
// if trace.ExportSpan was constraining and if perhaps the Jaeger // if trace.ExportSpan was constraining and if perhaps the Jaeger
// upload can use the context and information from the Node. // upload can use the context and information from the Node.
tdps = append(tdps, exporterwrapper.NewExporterWrapper("jaeger", je)) tps = append(tps, exporterwrapper.NewExporterWrapper("jaeger", je))
return return
} }

View File

@ -20,8 +20,8 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
kafka "github.com/yancl/opencensus-go-exporter-kafka" kafka "github.com/yancl/opencensus-go-exporter-kafka"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper" "github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper"
"github.com/census-instrumentation/opencensus-service/processor"
) )
type kafkaConfig struct { type kafkaConfig struct {
@ -29,9 +29,9 @@ type kafkaConfig struct {
Topic string `mapstructure:"topic,omitempty"` Topic string `mapstructure:"topic,omitempty"`
} }
// KafkaExportersFromViper unmarshals the viper and returns an processor.TraceDataProcessor targeting // KafkaExportersFromViper unmarshals the viper and returns an consumer.TraceConsumer targeting
// Kafka according to the configuration settings. // Kafka according to the configuration settings.
func KafkaExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataProcessor, mdps []processor.MetricsDataProcessor, doneFns []func() error, err error) { func KafkaExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps []consumer.MetricsConsumer, doneFns []func() error, err error) {
var cfg struct { var cfg struct {
Kafka *kafkaConfig `mapstructure:"kafka"` Kafka *kafkaConfig `mapstructure:"kafka"`
} }
@ -53,7 +53,7 @@ func KafkaExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataProcesso
return nil, nil, nil, fmt.Errorf("Cannot configure Kafka Trace exporter: %v", kerr) return nil, nil, nil, fmt.Errorf("Cannot configure Kafka Trace exporter: %v", kerr)
} }
tdps = append(tdps, exporterwrapper.NewExporterWrapper("kafka", kde)) tps = append(tps, exporterwrapper.NewExporterWrapper("kafka", kde))
doneFns = append(doneFns, func() error { doneFns = append(doneFns, func() error {
kde.Flush() kde.Flush()
return nil return nil

View File

@ -34,7 +34,7 @@ type loggingExporter struct{ logger *zap.Logger }
var _ exporter.TraceExporter = (*loggingExporter)(nil) var _ exporter.TraceExporter = (*loggingExporter)(nil)
var _ exporter.MetricsExporter = (*loggingExporter)(nil) var _ exporter.MetricsExporter = (*loggingExporter)(nil)
func (le *loggingExporter) ProcessTraceData(ctx context.Context, td data.TraceData) error { func (le *loggingExporter) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
le.logger.Debug("loggingTraceExporter", zap.Int("#spans", len(td.Spans))) le.logger.Debug("loggingTraceExporter", zap.Int("#spans", len(td.Spans)))
// TODO: Add ability to record the received data // TODO: Add ability to record the received data
@ -43,7 +43,7 @@ func (le *loggingExporter) ProcessTraceData(ctx context.Context, td data.TraceDa
return nil return nil
} }
func (le *loggingExporter) ProcessMetricsData(ctx context.Context, md data.MetricsData) error { func (le *loggingExporter) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error {
le.logger.Debug("loggingMetricsExporter", zap.Int("#metrics", len(md.Metrics))) le.logger.Debug("loggingMetricsExporter", zap.Int("#metrics", len(md.Metrics)))
// TODO: Add ability to record the received data // TODO: Add ability to record the received data
// TODO: Record metrics // TODO: Record metrics

View File

@ -29,7 +29,7 @@ func TestLoggingTraceExporterNoErrors(t *testing.T) {
td := data.TraceData{ td := data.TraceData{
Spans: make([]*tracepb.Span, 7), Spans: make([]*tracepb.Span, 7),
} }
if err := lte.ProcessTraceData(context.Background(), td); err != nil { if err := lte.ConsumeTraceData(context.Background(), td); err != nil {
t.Errorf("Wanted nil got error") t.Errorf("Wanted nil got error")
return return
} }
@ -48,7 +48,7 @@ func TestLoggingMetricsExporterNoErrors(t *testing.T) {
md := data.MetricsData{ md := data.MetricsData{
Metrics: make([]*metricspb.Metric, 7), Metrics: make([]*metricspb.Metric, 7),
} }
if err := lme.ProcessMetricsData(context.Background(), md); err != nil { if err := lme.ConsumeMetricsData(context.Background(), md); err != nil {
t.Errorf("Wanted nil got error") t.Errorf("Wanted nil got error")
return return
} }

View File

@ -25,11 +25,11 @@ import (
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal/compression" "github.com/census-instrumentation/opencensus-service/internal/compression"
"github.com/census-instrumentation/opencensus-service/internal/compression/grpc" "github.com/census-instrumentation/opencensus-service/internal/compression/grpc"
"github.com/census-instrumentation/opencensus-service/observability" "github.com/census-instrumentation/opencensus-service/observability"
"github.com/census-instrumentation/opencensus-service/processor"
) )
type opencensusConfig struct { type opencensusConfig struct {
@ -60,11 +60,11 @@ var (
ErrUnableToGetTLSCreds = errors.New("OpenCensus exporter unable to read TLS credentials") ErrUnableToGetTLSCreds = errors.New("OpenCensus exporter unable to read TLS credentials")
) )
var _ processor.TraceDataProcessor = (*ocagentExporter)(nil) var _ consumer.TraceConsumer = (*ocagentExporter)(nil)
// OpenCensusTraceExportersFromViper unmarshals the viper and returns an processor.TraceDataProcessor targeting // OpenCensusTraceExportersFromViper unmarshals the viper and returns an consumer.TraceConsumer targeting
// OpenCensus Agent/Collector according to the configuration settings. // OpenCensus Agent/Collector according to the configuration settings.
func OpenCensusTraceExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataProcessor, mdps []processor.MetricsDataProcessor, doneFns []func() error, err error) { func OpenCensusTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps []consumer.MetricsConsumer, doneFns []func() error, err error) {
var cfg struct { var cfg struct {
OpenCensus *opencensusConfig `mapstructure:"opencensus"` OpenCensus *opencensusConfig `mapstructure:"opencensus"`
} }
@ -120,16 +120,16 @@ func OpenCensusTraceExportersFromViper(v *viper.Viper) (tdps []processor.TraceDa
} }
oexp := &ocagentExporter{exporters: exporters} oexp := &ocagentExporter{exporters: exporters}
tdps = append(tdps, oexp) tps = append(tps, oexp)
// TODO: (@odeke-em, @songya23) implement ExportMetrics for OpenCensus. // TODO: (@odeke-em, @songya23) implement ExportMetrics for OpenCensus.
// mdps = append(mdps, oexp) // mps = append(mps, oexp)
return return
} }
const exporterTagValue = "oc_trace" const exporterTagValue = "oc_trace"
func (oce *ocagentExporter) ProcessTraceData(ctx context.Context, td data.TraceData) error { func (oce *ocagentExporter) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
// Get an exporter worker round-robin // Get an exporter worker round-robin
exporter := oce.exporters[atomic.AddUint32(&oce.counter, 1)%uint32(len(oce.exporters))] exporter := oce.exporters[atomic.AddUint32(&oce.counter, 1)%uint32(len(oce.exporters))]
err := exporter.ExportTraceServiceRequest( err := exporter.ExportTraceServiceRequest(

View File

@ -21,8 +21,8 @@ import (
"net/http" "net/http"
"strings" "strings"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/processor"
"github.com/spf13/viper" "github.com/spf13/viper"
// TODO: once this repository has been transferred to the // TODO: once this repository has been transferred to the
@ -45,10 +45,10 @@ type prometheusConfig struct {
var errBlankPrometheusAddress = errors.New("expecting a non-blank address to run the Prometheus metrics handler") var errBlankPrometheusAddress = errors.New("expecting a non-blank address to run the Prometheus metrics handler")
// PrometheusExportersFromViper unmarshals the viper and returns processor.MetricsDataProcessors // PrometheusExportersFromViper unmarshals the viper and returns consumer.MetricsConsumers
// targeting Prometheus according to the configuration settings. // targeting Prometheus according to the configuration settings.
// It allows HTTP clients to scrape it on endpoint path "/metrics". // It allows HTTP clients to scrape it on endpoint path "/metrics".
func PrometheusExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataProcessor, mdps []processor.MetricsDataProcessor, doneFns []func() error, err error) { func PrometheusExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps []consumer.MetricsConsumer, doneFns []func() error, err error) {
var cfg struct { var cfg struct {
Prometheus *prometheusConfig `mapstructure:"prometheus"` Prometheus *prometheusConfig `mapstructure:"prometheus"`
} }
@ -92,7 +92,7 @@ func PrometheusExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataPro
doneFns = append(doneFns, ln.Close) doneFns = append(doneFns, ln.Close)
pexp := &prometheusExporter{exporter: pe} pexp := &prometheusExporter{exporter: pe}
mdps = append(mdps, pexp) mps = append(mps, pexp)
return return
} }
@ -101,9 +101,9 @@ type prometheusExporter struct {
exporter *prometheus.Exporter exporter *prometheus.Exporter
} }
var _ processor.MetricsDataProcessor = (*prometheusExporter)(nil) var _ consumer.MetricsConsumer = (*prometheusExporter)(nil)
func (pe *prometheusExporter) ProcessMetricsData(ctx context.Context, md data.MetricsData) error { func (pe *prometheusExporter) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error {
for _, metric := range md.Metrics { for _, metric := range md.Metrics {
_ = pe.exporter.ExportMetric(ctx, md.Node, md.Resource, metric) _ = pe.exporter.ExportMetric(ctx, md.Node, md.Resource, metric)
} }

View File

@ -156,7 +156,7 @@ prometheus:
}, },
} }
me := mes[0] me := mes[0]
me.ProcessMetricsData(context.Background(), data.MetricsData{Metrics: []*metricspb.Metric{metric1}}) me.ConsumeMetricsData(context.Background(), data.MetricsData{Metrics: []*metricspb.Metric{metric1}})
res, err := http.Get("http://localhost:7777/metrics") res, err := http.Get("http://localhost:7777/metrics")
if err != nil { if err != nil {

View File

@ -24,9 +24,9 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
"go.opencensus.io/trace" "go.opencensus.io/trace"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper" "github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper"
"github.com/census-instrumentation/opencensus-service/processor"
) )
type stackdriverConfig struct { type stackdriverConfig struct {
@ -41,11 +41,11 @@ type stackdriverExporter struct {
exporter *stackdriver.Exporter exporter *stackdriver.Exporter
} }
var _ processor.MetricsDataProcessor = (*stackdriverExporter)(nil) var _ consumer.MetricsConsumer = (*stackdriverExporter)(nil)
// StackdriverTraceExportersFromViper unmarshals the viper and returns an processor.TraceDataProcessor targeting // StackdriverTraceExportersFromViper unmarshals the viper and returns an consumer.TraceConsumer targeting
// Stackdriver according to the configuration settings. // Stackdriver according to the configuration settings.
func StackdriverTraceExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataProcessor, mdps []processor.MetricsDataProcessor, doneFns []func() error, err error) { func StackdriverTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps []consumer.MetricsConsumer, doneFns []func() error, err error) {
var cfg struct { var cfg struct {
Stackdriver *stackdriverConfig `mapstructure:"stackdriver"` Stackdriver *stackdriverConfig `mapstructure:"stackdriver"`
} }
@ -92,11 +92,11 @@ func StackdriverTraceExportersFromViper(v *viper.Viper) (tdps []processor.TraceD
// if trace.ExportSpan was constraining and if perhaps the Stackdriver // if trace.ExportSpan was constraining and if perhaps the Stackdriver
// upload can use the context and information from the Node. // upload can use the context and information from the Node.
if sc.EnableTracing { if sc.EnableTracing {
tdps = append(tdps, exporterwrapper.NewExporterWrapper("stackdriver", sde)) tps = append(tps, exporterwrapper.NewExporterWrapper("stackdriver", sde))
} }
if sc.EnableMetrics { if sc.EnableMetrics {
mdps = append(mdps, exp) mps = append(mps, exp)
} }
doneFns = append(doneFns, func() error { doneFns = append(doneFns, func() error {
@ -106,7 +106,7 @@ func StackdriverTraceExportersFromViper(v *viper.Viper) (tdps []processor.TraceD
return return
} }
func (sde *stackdriverExporter) ProcessMetricsData(ctx context.Context, md data.MetricsData) error { func (sde *stackdriverExporter) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error {
ctx, span := trace.StartSpan(ctx, ctx, span := trace.StartSpan(ctx,
"opencensus.service.exporter.stackdriver.ExportMetricsData", "opencensus.service.exporter.stackdriver.ExportMetricsData",
trace.WithSampler(trace.NeverSample())) trace.WithSampler(trace.NeverSample()))

View File

@ -31,9 +31,9 @@ import (
"go.opencensus.io/trace" "go.opencensus.io/trace"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/observability" "github.com/census-instrumentation/opencensus-service/observability"
"github.com/census-instrumentation/opencensus-service/processor"
spandatatranslator "github.com/census-instrumentation/opencensus-service/translator/trace/spandata" spandatatranslator "github.com/census-instrumentation/opencensus-service/translator/trace/spandata"
) )
@ -79,7 +79,7 @@ func (zc *ZipkinConfig) EndpointURL() string {
// ZipkinExportersFromViper unmarshals the viper and returns an exporter.TraceExporter targeting // ZipkinExportersFromViper unmarshals the viper and returns an exporter.TraceExporter targeting
// Zipkin according to the configuration settings. // Zipkin according to the configuration settings.
func ZipkinExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataProcessor, mdps []processor.MetricsDataProcessor, doneFns []func() error, err error) { func ZipkinExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps []consumer.MetricsConsumer, doneFns []func() error, err error) {
var cfg struct { var cfg struct {
Zipkin *ZipkinConfig `mapstructure:"zipkin"` Zipkin *ZipkinConfig `mapstructure:"zipkin"`
} }
@ -109,7 +109,7 @@ func ZipkinExportersFromViper(v *viper.Viper) (tdps []processor.TraceDataProcess
if err != nil { if err != nil {
return nil, nil, nil, fmt.Errorf("Cannot configure Zipkin exporter: %v", err) return nil, nil, nil, fmt.Errorf("Cannot configure Zipkin exporter: %v", err)
} }
tdps = append(tdps, zle) tps = append(tps, zle)
doneFns = append(doneFns, zle.stop) doneFns = append(doneFns, zle.stop)
return return
} }
@ -192,7 +192,7 @@ func (ze *zipkinExporter) stop() error {
return ze.reporter.Close() return ze.reporter.Close()
} }
func (ze *zipkinExporter) ProcessTraceData(ctx context.Context, td data.TraceData) (zerr error) { func (ze *zipkinExporter) ConsumeTraceData(ctx context.Context, td data.TraceData) (zerr error) {
ctx, span := trace.StartSpan(ctx, ctx, span := trace.StartSpan(ctx,
"opencensus.service.exporter.zipkin.ExportTrace", "opencensus.service.exporter.zipkin.ExportTrace",
trace.WithSampler(trace.NeverSample())) trace.WithSampler(trace.NeverSample()))

View File

@ -34,7 +34,7 @@ import (
"github.com/census-instrumentation/opencensus-service/internal/config/viperutils" "github.com/census-instrumentation/opencensus-service/internal/config/viperutils"
"github.com/census-instrumentation/opencensus-service/internal/testutils" "github.com/census-instrumentation/opencensus-service/internal/testutils"
"github.com/census-instrumentation/opencensus-service/processor" "github.com/census-instrumentation/opencensus-service/processor/multiconsumer"
"github.com/census-instrumentation/opencensus-service/receiver/zipkinreceiver" "github.com/census-instrumentation/opencensus-service/receiver/zipkinreceiver"
) )
@ -173,7 +173,7 @@ zipkin:
t.Fatalf("Failed to create a new Zipkin receiver: %v", err) t.Fatalf("Failed to create a new Zipkin receiver: %v", err)
} }
zexp := processor.NewMultiTraceDataProcessor(tes) zexp := multiconsumer.NewTraceProcessor(tes)
if err := zi.StartTraceReception(context.Background(), zexp); err != nil { if err := zi.StartTraceReception(context.Background(), zexp); err != nil {
t.Fatalf("Failed to start trace reception: %v", err) t.Fatalf("Failed to start trace reception: %v", err)
} }

View File

@ -17,23 +17,25 @@ package processor
import ( import (
"context" "context"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/processor" "github.com/census-instrumentation/opencensus-service/processor"
"github.com/census-instrumentation/opencensus-service/processor/multiconsumer"
) )
type exporterSpanProcessor struct { type exporterSpanProcessor struct {
tdp processor.TraceDataProcessor tp processor.TraceProcessor
} }
var _ SpanProcessor = (*exporterSpanProcessor)(nil) var _ SpanProcessor = (*exporterSpanProcessor)(nil)
// NewTraceExporterProcessor creates processor that feeds SpanData to the given trace exporters. // NewTraceExporterProcessor creates processor that feeds SpanData to the given trace exporters.
func NewTraceExporterProcessor(traceExporters ...processor.TraceDataProcessor) SpanProcessor { func NewTraceExporterProcessor(traceExporters ...consumer.TraceConsumer) SpanProcessor {
return &exporterSpanProcessor{tdp: processor.NewMultiTraceDataProcessor(traceExporters)} return &exporterSpanProcessor{tp: multiconsumer.NewTraceProcessor(traceExporters)}
} }
func (sp *exporterSpanProcessor) ProcessSpans(td data.TraceData, spanFormat string) error { func (sp *exporterSpanProcessor) ProcessSpans(td data.TraceData, spanFormat string) error {
err := sp.tdp.ProcessTraceData(context.Background(), td) err := sp.tp.ConsumeTraceData(context.Background(), td)
if err != nil { if err != nil {
return err return err
} }

View File

@ -17,8 +17,8 @@ package processor
import ( import (
"context" "context"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/processor"
) )
type protoProcessorSink struct { type protoProcessorSink struct {
@ -26,16 +26,16 @@ type protoProcessorSink struct {
protoProcessor SpanProcessor protoProcessor SpanProcessor
} }
var _ (processor.TraceDataProcessor) = (*protoProcessorSink)(nil) var _ (consumer.TraceConsumer) = (*protoProcessorSink)(nil)
// WrapWithSpanSink wraps a processor to be used as a span sink by receivers. // WrapWithSpanSink wraps a processor to be used as a span sink by receivers.
func WrapWithSpanSink(format string, p SpanProcessor) processor.TraceDataProcessor { func WrapWithSpanSink(format string, p SpanProcessor) consumer.TraceConsumer {
return &protoProcessorSink{ return &protoProcessorSink{
sourceFormat: format, sourceFormat: format,
protoProcessor: p, protoProcessor: p,
} }
} }
func (ps *protoProcessorSink) ProcessTraceData(ctx context.Context, td data.TraceData) error { func (ps *protoProcessorSink) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
return ps.protoProcessor.ProcessSpans(td, ps.sourceFormat) return ps.protoProcessor.ProcessSpans(td, ps.sourceFormat)
} }

View File

@ -26,6 +26,7 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/exporter/awsexporter" "github.com/census-instrumentation/opencensus-service/exporter/awsexporter"
"github.com/census-instrumentation/opencensus-service/exporter/datadogexporter" "github.com/census-instrumentation/opencensus-service/exporter/datadogexporter"
"github.com/census-instrumentation/opencensus-service/exporter/honeycombexporter" "github.com/census-instrumentation/opencensus-service/exporter/honeycombexporter"
@ -35,7 +36,6 @@ import (
"github.com/census-instrumentation/opencensus-service/exporter/prometheusexporter" "github.com/census-instrumentation/opencensus-service/exporter/prometheusexporter"
"github.com/census-instrumentation/opencensus-service/exporter/stackdriverexporter" "github.com/census-instrumentation/opencensus-service/exporter/stackdriverexporter"
"github.com/census-instrumentation/opencensus-service/exporter/zipkinexporter" "github.com/census-instrumentation/opencensus-service/exporter/zipkinexporter"
"github.com/census-instrumentation/opencensus-service/processor"
"github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver" "github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver"
"github.com/census-instrumentation/opencensus-service/receiver/prometheusreceiver" "github.com/census-instrumentation/opencensus-service/receiver/prometheusreceiver"
) )
@ -443,10 +443,10 @@ func eqLocalHost(host string) bool {
// + prometheus // + prometheus
// + aws-xray // + aws-xray
// + honeycomb // + honeycomb
func ExportersFromViperConfig(logger *zap.Logger, v *viper.Viper) ([]processor.TraceDataProcessor, []processor.MetricsDataProcessor, []func() error, error) { func ExportersFromViperConfig(logger *zap.Logger, v *viper.Viper) ([]consumer.TraceConsumer, []consumer.MetricsConsumer, []func() error, error) {
parseFns := []struct { parseFns := []struct {
name string name string
fn func(*viper.Viper) ([]processor.TraceDataProcessor, []processor.MetricsDataProcessor, []func() error, error) fn func(*viper.Viper) ([]consumer.TraceConsumer, []consumer.MetricsConsumer, []func() error, error)
}{ }{
{name: "datadog", fn: datadogexporter.DatadogTraceExportersFromViper}, {name: "datadog", fn: datadogexporter.DatadogTraceExportersFromViper},
{name: "stackdriver", fn: stackdriverexporter.StackdriverTraceExportersFromViper}, {name: "stackdriver", fn: stackdriverexporter.StackdriverTraceExportersFromViper},
@ -459,8 +459,8 @@ func ExportersFromViperConfig(logger *zap.Logger, v *viper.Viper) ([]processor.T
{name: "honeycomb", fn: honeycombexporter.HoneycombTraceExportersFromViper}, {name: "honeycomb", fn: honeycombexporter.HoneycombTraceExportersFromViper},
} }
var traceExporters []processor.TraceDataProcessor var traceExporters []consumer.TraceConsumer
var metricsExporters []processor.MetricsDataProcessor var metricsExporters []consumer.MetricsConsumer
var doneFns []func() error var doneFns []func() error
exportersViper := v.Sub("exporters") exportersViper := v.Sub("exporters")
if exportersViper == nil { if exportersViper == nil {

View File

@ -103,7 +103,7 @@ func CheckRecordedMetricsForTraceExporter(t *testing.T, te exporter.TraceExporte
ctx := observability.ContextWithReceiverName(context.Background(), fakeReceiverName) ctx := observability.ContextWithReceiverName(context.Background(), fakeReceiverName)
const numBatches = 7 const numBatches = 7
for i := 0; i < numBatches; i++ { for i := 0; i < numBatches; i++ {
if err := te.ProcessTraceData(ctx, td); err != nil { if err := te.ConsumeTraceData(ctx, td); err != nil {
t.Fatalf("Want nil got %v", err) t.Fatalf("Want nil got %v", err)
} }
} }

17
processor/empty_test.go Normal file
View File

@ -0,0 +1,17 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package processor
// Package with interface only.

View File

@ -16,28 +16,28 @@ package processor
import "github.com/spf13/viper" import "github.com/spf13/viper"
// TraceDataProcessorFactory is an interface that builds a new TraceDataProcessor based on // TraceProcessorFactory is an interface that builds a new TraceProcessor based on
// some viper.Viper configuration. // some viper.Viper configuration.
type TraceDataProcessorFactory interface { type TraceProcessorFactory interface {
// Type gets the type of the TraceDataProcessor created by this factory. // Type gets the type of the TraceProcessor created by this factory.
Type() string Type() string
// NewFromViper takes a viper.Viper config and creates a new TraceDataProcessor which uses next as // NewFromViper takes a viper.Viper config and creates a new TraceProcessor which uses next as
// the next TraceDataProcessor in the pipeline. // the next TraceProcessor in the pipeline.
NewFromViper(cfg *viper.Viper, next TraceDataProcessor) (TraceDataProcessor, error) NewFromViper(cfg *viper.Viper, next TraceProcessor) (TraceProcessor, error)
// DefaultConfig returns the default configuration for TraceDataProcessors // DefaultConfig returns the default configuration for TraceProcessors
// created by this factory. // created by this factory.
DefaultConfig() *viper.Viper DefaultConfig() *viper.Viper
} }
// MetricsDataProcessorFactory is an interface that builds a new MetricsDataProcessor based on // MetricsProcessorFactory is an interface that builds a new MetricsProcessor based on
// some viper.Viper configuration. // some viper.Viper configuration.
type MetricsDataProcessorFactory interface { type MetricsProcessorFactory interface {
// Type gets the type of the MetricsDataProcessor created by this factory. // Type gets the type of the MetricsProcessor created by this factory.
Type() string Type() string
// NewFromViper takes a viper.Viper config and creates a new MetricsDataProcessor which uses next as // NewFromViper takes a viper.Viper config and creates a new MetricsProcessor which uses next as
// the next MetricsDataProcessor in the pipeline. // the next MetricsProcessor in the pipeline.
NewFromViper(cfg *viper.Viper, next MetricsDataProcessor) (MetricsDataProcessor, error) NewFromViper(cfg *viper.Viper, next MetricsProcessor) (MetricsProcessor, error)
// DefaultConfig returns the default configuration for MetricsDataProcessors // DefaultConfig returns the default configuration for MetricsProcessors
// created by this factory. // created by this factory.
DefaultConfig() *viper.Viper DefaultConfig() *viper.Viper
} }

View File

@ -1,62 +0,0 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package processor
import (
"context"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal"
)
// NewMultiMetricsDataProcessor wraps multiple metrics exporters in a single one.
func NewMultiMetricsDataProcessor(mdps []MetricsDataProcessor) MetricsDataProcessor {
return metricsDataProcessors(mdps)
}
type metricsDataProcessors []MetricsDataProcessor
var _ MetricsDataProcessor = (*metricsDataProcessors)(nil)
// ExportMetricsData exports the MetricsData to all exporters wrapped by the current one.
func (mdps metricsDataProcessors) ProcessMetricsData(ctx context.Context, md data.MetricsData) error {
var errs []error
for _, mdp := range mdps {
if err := mdp.ProcessMetricsData(ctx, md); err != nil {
errs = append(errs, err)
}
}
return internal.CombineErrors(errs)
}
// NewMultiTraceDataProcessor wraps multiple trace exporters in a single one.
func NewMultiTraceDataProcessor(tdps []TraceDataProcessor) TraceDataProcessor {
return traceDataProcessors(tdps)
}
type traceDataProcessors []TraceDataProcessor
var _ TraceDataProcessor = (*traceDataProcessors)(nil)
// ExportSpans exports the span data to all trace exporters wrapped by the current one.
func (tdps traceDataProcessors) ProcessTraceData(ctx context.Context, td data.TraceData) error {
var errs []error
for _, tdp := range tdps {
if err := tdp.ProcessTraceData(ctx, td); err != nil {
errs = append(errs, err)
}
}
return internal.CombineErrors(errs)
}

View File

@ -0,0 +1,64 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package multiconsumer
import (
"context"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/processor"
)
// NewMetricsProcessor wraps multiple metrics consumers in a single one.
func NewMetricsProcessor(mcs []consumer.MetricsConsumer) processor.MetricsProcessor {
return metricsConsumers(mcs)
}
type metricsConsumers []consumer.MetricsConsumer
var _ processor.MetricsProcessor = (*metricsConsumers)(nil)
// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one.
func (mcs metricsConsumers) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error {
var errs []error
for _, mdp := range mcs {
if err := mdp.ConsumeMetricsData(ctx, md); err != nil {
errs = append(errs, err)
}
}
return internal.CombineErrors(errs)
}
// NewTraceProcessor wraps multiple trace consumers in a single one.
func NewTraceProcessor(tcs []consumer.TraceConsumer) processor.TraceProcessor {
return traceConsumers(tcs)
}
type traceConsumers []consumer.TraceConsumer
var _ processor.TraceProcessor = (*traceConsumers)(nil)
// ConsumeTraceData exports the span data to all trace consumers wrapped by the current one.
func (tcs traceConsumers) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
var errs []error
for _, tdp := range tcs {
if err := tdp.ConsumeTraceData(ctx, td); err != nil {
errs = append(errs, err)
}
}
return internal.CombineErrors(errs)
}

View File

@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package processor package multiconsumer
import ( import (
"context" "context"
@ -20,16 +20,17 @@ import (
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
) )
func TestMultiTraceDataProcessorMultiplexing(t *testing.T) { func TestTraceProcessorMultiplexing(t *testing.T) {
processors := make([]TraceDataProcessor, 3) processors := make([]consumer.TraceConsumer, 3)
for i := range processors { for i := range processors {
processors[i] = &mockTraceDataProcessor{} processors[i] = &mockTraceConsumer{}
} }
mtdp := NewMultiTraceDataProcessor(processors) tdp := NewTraceProcessor(processors)
td := data.TraceData{ td := data.TraceData{
Spans: make([]*tracepb.Span, 7), Spans: make([]*tracepb.Span, 7),
} }
@ -37,7 +38,7 @@ func TestMultiTraceDataProcessorMultiplexing(t *testing.T) {
var wantSpansCount = 0 var wantSpansCount = 0
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
wantSpansCount += len(td.Spans) wantSpansCount += len(td.Spans)
err := mtdp.ProcessTraceData(context.Background(), td) err := tdp.ConsumeTraceData(context.Background(), td)
if err != nil { if err != nil {
t.Errorf("Wanted nil got error") t.Errorf("Wanted nil got error")
return return
@ -45,7 +46,7 @@ func TestMultiTraceDataProcessorMultiplexing(t *testing.T) {
} }
for _, p := range processors { for _, p := range processors {
m := p.(*mockTraceDataProcessor) m := p.(*mockTraceConsumer)
if m.TotalSpans != wantSpansCount { if m.TotalSpans != wantSpansCount {
t.Errorf("Wanted %d spans for every processor but got %d", wantSpansCount, m.TotalSpans) t.Errorf("Wanted %d spans for every processor but got %d", wantSpansCount, m.TotalSpans)
return return
@ -53,16 +54,16 @@ func TestMultiTraceDataProcessorMultiplexing(t *testing.T) {
} }
} }
func TestMultiTraceDataProcessorWhenOneErrors(t *testing.T) { func TestTraceProcessorWhenOneErrors(t *testing.T) {
processors := make([]TraceDataProcessor, 3) processors := make([]consumer.TraceConsumer, 3)
for i := range processors { for i := range processors {
processors[i] = &mockTraceDataProcessor{} processors[i] = &mockTraceConsumer{}
} }
// Make one processor return error // Make one processor return error
processors[1].(*mockTraceDataProcessor).MustFail = true processors[1].(*mockTraceConsumer).MustFail = true
mtdp := NewMultiTraceDataProcessor(processors) tdp := NewTraceProcessor(processors)
td := data.TraceData{ td := data.TraceData{
Spans: make([]*tracepb.Span, 5), Spans: make([]*tracepb.Span, 5),
} }
@ -70,7 +71,7 @@ func TestMultiTraceDataProcessorWhenOneErrors(t *testing.T) {
var wantSpansCount = 0 var wantSpansCount = 0
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
wantSpansCount += len(td.Spans) wantSpansCount += len(td.Spans)
err := mtdp.ProcessTraceData(context.Background(), td) err := tdp.ConsumeTraceData(context.Background(), td)
if err == nil { if err == nil {
t.Errorf("Wanted error got nil") t.Errorf("Wanted error got nil")
return return
@ -78,7 +79,7 @@ func TestMultiTraceDataProcessorWhenOneErrors(t *testing.T) {
} }
for _, p := range processors { for _, p := range processors {
m := p.(*mockTraceDataProcessor) m := p.(*mockTraceConsumer)
if m.TotalSpans != wantSpansCount { if m.TotalSpans != wantSpansCount {
t.Errorf("Wanted %d spans for every processor but got %d", wantSpansCount, m.TotalSpans) t.Errorf("Wanted %d spans for every processor but got %d", wantSpansCount, m.TotalSpans)
return return
@ -86,13 +87,13 @@ func TestMultiTraceDataProcessorWhenOneErrors(t *testing.T) {
} }
} }
func TestMultiMetricsDataProcessorMultiplexing(t *testing.T) { func TestMetricsProcessorMultiplexing(t *testing.T) {
processors := make([]MetricsDataProcessor, 3) processors := make([]consumer.MetricsConsumer, 3)
for i := range processors { for i := range processors {
processors[i] = &mockMetricsDataProcessor{} processors[i] = &mockMetricsConsumer{}
} }
mmdp := NewMultiMetricsDataProcessor(processors) mdp := NewMetricsProcessor(processors)
md := data.MetricsData{ md := data.MetricsData{
Metrics: make([]*metricspb.Metric, 7), Metrics: make([]*metricspb.Metric, 7),
} }
@ -100,7 +101,7 @@ func TestMultiMetricsDataProcessorMultiplexing(t *testing.T) {
var wantMetricsCount = 0 var wantMetricsCount = 0
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
wantMetricsCount += len(md.Metrics) wantMetricsCount += len(md.Metrics)
err := mmdp.ProcessMetricsData(context.Background(), md) err := mdp.ConsumeMetricsData(context.Background(), md)
if err != nil { if err != nil {
t.Errorf("Wanted nil got error") t.Errorf("Wanted nil got error")
return return
@ -108,7 +109,7 @@ func TestMultiMetricsDataProcessorMultiplexing(t *testing.T) {
} }
for _, p := range processors { for _, p := range processors {
m := p.(*mockMetricsDataProcessor) m := p.(*mockMetricsConsumer)
if m.TotalMetrics != wantMetricsCount { if m.TotalMetrics != wantMetricsCount {
t.Errorf("Wanted %d metrics for every processor but got %d", wantMetricsCount, m.TotalMetrics) t.Errorf("Wanted %d metrics for every processor but got %d", wantMetricsCount, m.TotalMetrics)
return return
@ -116,16 +117,16 @@ func TestMultiMetricsDataProcessorMultiplexing(t *testing.T) {
} }
} }
func TestMultiMetricsDataProcessorWhenOneErrors(t *testing.T) { func TestMetricsProcessorWhenOneErrors(t *testing.T) {
processors := make([]MetricsDataProcessor, 3) processors := make([]consumer.MetricsConsumer, 3)
for i := range processors { for i := range processors {
processors[i] = &mockMetricsDataProcessor{} processors[i] = &mockMetricsConsumer{}
} }
// Make one processor return error // Make one processor return error
processors[1].(*mockMetricsDataProcessor).MustFail = true processors[1].(*mockMetricsConsumer).MustFail = true
mmdp := NewMultiMetricsDataProcessor(processors) mdp := NewMetricsProcessor(processors)
md := data.MetricsData{ md := data.MetricsData{
Metrics: make([]*metricspb.Metric, 5), Metrics: make([]*metricspb.Metric, 5),
} }
@ -133,7 +134,7 @@ func TestMultiMetricsDataProcessorWhenOneErrors(t *testing.T) {
var wantMetricsCount = 0 var wantMetricsCount = 0
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
wantMetricsCount += len(md.Metrics) wantMetricsCount += len(md.Metrics)
err := mmdp.ProcessMetricsData(context.Background(), md) err := mdp.ConsumeMetricsData(context.Background(), md)
if err == nil { if err == nil {
t.Errorf("Wanted error got nil") t.Errorf("Wanted error got nil")
return return
@ -141,7 +142,7 @@ func TestMultiMetricsDataProcessorWhenOneErrors(t *testing.T) {
} }
for _, p := range processors { for _, p := range processors {
m := p.(*mockMetricsDataProcessor) m := p.(*mockMetricsConsumer)
if m.TotalMetrics != wantMetricsCount { if m.TotalMetrics != wantMetricsCount {
t.Errorf("Wanted %d metrics for every processor but got %d", wantMetricsCount, m.TotalMetrics) t.Errorf("Wanted %d metrics for every processor but got %d", wantMetricsCount, m.TotalMetrics)
return return
@ -149,14 +150,14 @@ func TestMultiMetricsDataProcessorWhenOneErrors(t *testing.T) {
} }
} }
type mockTraceDataProcessor struct { type mockTraceConsumer struct {
TotalSpans int TotalSpans int
MustFail bool MustFail bool
} }
var _ TraceDataProcessor = &mockTraceDataProcessor{} var _ consumer.TraceConsumer = &mockTraceConsumer{}
func (p *mockTraceDataProcessor) ProcessTraceData(ctx context.Context, td data.TraceData) error { func (p *mockTraceConsumer) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
p.TotalSpans += len(td.Spans) p.TotalSpans += len(td.Spans)
if p.MustFail { if p.MustFail {
return fmt.Errorf("this processor must fail") return fmt.Errorf("this processor must fail")
@ -165,14 +166,14 @@ func (p *mockTraceDataProcessor) ProcessTraceData(ctx context.Context, td data.T
return nil return nil
} }
type mockMetricsDataProcessor struct { type mockMetricsConsumer struct {
TotalMetrics int TotalMetrics int
MustFail bool MustFail bool
} }
var _ MetricsDataProcessor = &mockMetricsDataProcessor{} var _ consumer.MetricsConsumer = &mockMetricsConsumer{}
func (p *mockMetricsDataProcessor) ProcessMetricsData(ctx context.Context, td data.MetricsData) error { func (p *mockMetricsConsumer) ConsumeMetricsData(ctx context.Context, td data.MetricsData) error {
p.TotalMetrics += len(td.Metrics) p.TotalMetrics += len(td.Metrics)
if p.MustFail { if p.MustFail {
return fmt.Errorf("this processor must fail") return fmt.Errorf("this processor must fail")

View File

@ -15,23 +15,19 @@
package processor package processor
import ( import (
"context" "github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data"
) )
// MetricsDataProcessor is an interface that receives data.MetricsData, process it as needed, and // TraceProcessor composes TraceConsumer with some additional processor-specific functions.
// sends it to the next processing node if any or to the destination. type TraceProcessor interface {
// consumer.TraceConsumer
// ProcessMetricsData receives data.MetricsData for processing by the MetricsDataProcessor.
type MetricsDataProcessor interface { // TODO: Add processor specific functions.
ProcessMetricsData(ctx context.Context, md data.MetricsData) error
} }
// TraceDataProcessor is an interface that receives data.TraceData, process it as needed, and // MetricsProcessor composes MetricsConsumer with some additional processor-specific functions.
// sends it to the next processing node if any or to the destination. type MetricsProcessor interface {
// consumer.MetricsConsumer
// ProcessTraceData receives data.TraceData for processing by the TraceDataProcessor.
type TraceDataProcessor interface { // TODO: Add processor specific functions.
ProcessTraceData(ctx context.Context, td data.TraceData) error
} }

View File

@ -17,32 +17,33 @@ package processortest
import ( import (
"context" "context"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/processor" "github.com/census-instrumentation/opencensus-service/processor"
) )
type nopProcessor struct { type nopProcessor struct {
nextTraceProcessor processor.TraceDataProcessor nextTraceProcessor consumer.TraceConsumer
nextMetricsProcessor processor.MetricsDataProcessor nextMetricsProcessor consumer.MetricsConsumer
} }
var _ processor.TraceDataProcessor = (*nopProcessor)(nil) var _ processor.TraceProcessor = (*nopProcessor)(nil)
var _ processor.MetricsDataProcessor = (*nopProcessor)(nil) var _ processor.MetricsProcessor = (*nopProcessor)(nil)
func (np *nopProcessor) ProcessTraceData(ctx context.Context, td data.TraceData) error { func (np *nopProcessor) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
return np.nextTraceProcessor.ProcessTraceData(ctx, td) return np.nextTraceProcessor.ConsumeTraceData(ctx, td)
} }
func (np *nopProcessor) ProcessMetricsData(ctx context.Context, md data.MetricsData) error { func (np *nopProcessor) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error {
return np.nextMetricsProcessor.ProcessMetricsData(ctx, md) return np.nextMetricsProcessor.ConsumeMetricsData(ctx, md)
} }
// NewNopTraceProcessor creates an TraceDataProcessor that just pass the received data to the nextTraceProcessor. // NewNopTraceProcessor creates an TraceProcessor that just pass the received data to the nextTraceProcessor.
func NewNopTraceProcessor(nextTraceProcessor processor.TraceDataProcessor) processor.TraceDataProcessor { func NewNopTraceProcessor(nextTraceProcessor consumer.TraceConsumer) consumer.TraceConsumer {
return &nopProcessor{nextTraceProcessor: nextTraceProcessor} return &nopProcessor{nextTraceProcessor: nextTraceProcessor}
} }
// NewNopMetricsProcessor creates an MetricsDataProcessor that just pass the received data to the nextMetricsProcessor. // NewNopMetricsProcessor creates an MetricsProcessor that just pass the received data to the nextMetricsProcessor.
func NewNopMetricsProcessor(nextMetricsProcessor processor.MetricsDataProcessor) processor.MetricsDataProcessor { func NewNopMetricsProcessor(nextMetricsProcessor consumer.MetricsConsumer) consumer.MetricsConsumer {
return &nopProcessor{nextMetricsProcessor: nextMetricsProcessor} return &nopProcessor{nextMetricsProcessor: nextMetricsProcessor}
} }

View File

@ -24,13 +24,13 @@ import (
"github.com/census-instrumentation/opencensus-service/exporter/exportertest" "github.com/census-instrumentation/opencensus-service/exporter/exportertest"
) )
func TestNopTraceDataProcessorNoErrors(t *testing.T) { func TestNopTraceProcessorNoErrors(t *testing.T) {
sink := new(exportertest.SinkTraceExporter) sink := new(exportertest.SinkTraceExporter)
ntp := NewNopTraceProcessor(sink) ntp := NewNopTraceProcessor(sink)
want := data.TraceData{ want := data.TraceData{
Spans: make([]*tracepb.Span, 7), Spans: make([]*tracepb.Span, 7),
} }
if err := ntp.ProcessTraceData(context.Background(), want); err != nil { if err := ntp.ConsumeTraceData(context.Background(), want); err != nil {
t.Errorf("Wanted nil got error") t.Errorf("Wanted nil got error")
return return
} }
@ -40,13 +40,13 @@ func TestNopTraceDataProcessorNoErrors(t *testing.T) {
} }
} }
func TestNopMetricsDataProcessorNoErrors(t *testing.T) { func TestNopMetricsProcessorNoErrors(t *testing.T) {
sink := new(exportertest.SinkMetricsExporter) sink := new(exportertest.SinkMetricsExporter)
nmp := NewNopMetricsProcessor(sink) nmp := NewNopMetricsProcessor(sink)
want := data.MetricsData{ want := data.MetricsData{
Metrics: make([]*metricspb.Metric, 7), Metrics: make([]*metricspb.Metric, 7),
} }
if err := nmp.ProcessMetricsData(context.Background(), want); err != nil { if err := nmp.ConsumeMetricsData(context.Background(), want); err != nil {
t.Errorf("Wanted nil got error") t.Errorf("Wanted nil got error")
return return
} }

View File

@ -15,7 +15,7 @@
package receiver package receiver
import ( import (
"github.com/census-instrumentation/opencensus-service/processor" "github.com/census-instrumentation/opencensus-service/consumer"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@ -25,8 +25,8 @@ type TraceReceiverFactory interface {
// Type gets the type of the TraceReceiver created by this factory. // Type gets the type of the TraceReceiver created by this factory.
Type() string Type() string
// NewFromViper takes a viper.Viper config and creates a new TraceReceiver which uses next as the // NewFromViper takes a viper.Viper config and creates a new TraceReceiver which uses next as the
// next TraceDataProcessor in the pipeline. // next TraceConsumer in the pipeline.
NewFromViper(cfg *viper.Viper, next processor.TraceDataProcessor) (TraceReceiver, error) NewFromViper(cfg *viper.Viper, next consumer.TraceConsumer) (TraceReceiver, error)
// DefaultConfig returns the default configuration for TraceReceivers // DefaultConfig returns the default configuration for TraceReceivers
// created by this factory. // created by this factory.
DefaultConfig() *viper.Viper DefaultConfig() *viper.Viper
@ -38,8 +38,8 @@ type MetricsReceiverFactory interface {
// Type gets the type of the MetricsReceiver created by this factory. // Type gets the type of the MetricsReceiver created by this factory.
Type() string Type() string
// NewFromViper takes a viper.Viper config and creates a new MetricsReceiver which uses next as the // NewFromViper takes a viper.Viper config and creates a new MetricsReceiver which uses next as the
// next MetricsDataProcessor in the pipeline. // next MetricsConsumer in the pipeline.
NewFromViper(cfg *viper.Viper, next processor.MetricsDataProcessor) (MetricsReceiver, error) NewFromViper(cfg *viper.Viper, next consumer.MetricsConsumer) (MetricsReceiver, error)
// DefaultConfig returns the default configuration for MetricsReceivers // DefaultConfig returns the default configuration for MetricsReceivers
// created by this factory. // created by this factory.
DefaultConfig() *viper.Viper DefaultConfig() *viper.Viper

View File

@ -37,8 +37,8 @@ import (
"github.com/uber/tchannel-go/thrift" "github.com/uber/tchannel-go/thrift"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/observability" "github.com/census-instrumentation/opencensus-service/observability"
"github.com/census-instrumentation/opencensus-service/processor"
"github.com/census-instrumentation/opencensus-service/receiver" "github.com/census-instrumentation/opencensus-service/receiver"
jaegertranslator "github.com/census-instrumentation/opencensus-service/translator/trace/jaeger" jaegertranslator "github.com/census-instrumentation/opencensus-service/translator/trace/jaeger"
) )
@ -60,7 +60,7 @@ type jReceiver struct {
// mu protects the fields of this type // mu protects the fields of this type
mu sync.Mutex mu sync.Mutex
nextProcessor processor.TraceDataProcessor nextConsumer consumer.TraceConsumer
startOnce sync.Once startOnce sync.Once
stopOnce sync.Once stopOnce sync.Once
@ -170,7 +170,7 @@ func (jr *jReceiver) TraceSource() string {
return traceSource return traceSource
} }
func (jr *jReceiver) StartTraceReception(ctx context.Context, nextProcessor processor.TraceDataProcessor) error { func (jr *jReceiver) StartTraceReception(ctx context.Context, nextConsumer consumer.TraceConsumer) error {
jr.mu.Lock() jr.mu.Lock()
defer jr.mu.Unlock() defer jr.mu.Unlock()
@ -186,8 +186,8 @@ func (jr *jReceiver) StartTraceReception(ctx context.Context, nextProcessor proc
return return
} }
// Finally set the nextProcessor, since we never encountered an error. // Finally set the nextConsumer, since we never encountered an error.
jr.nextProcessor = nextProcessor jr.nextConsumer = nextConsumer
err = nil err = nil
}) })
@ -249,7 +249,7 @@ func (jr *jReceiver) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch)
if err == nil { if err == nil {
ok = true ok = true
jr.nextProcessor.ProcessTraceData(ctx, td) jr.nextConsumer.ConsumeTraceData(ctx, td)
// We MUST unconditionally record metrics from this reception. // We MUST unconditionally record metrics from this reception.
observability.RecordTraceReceiverMetrics(ctxWithReceiverName, len(batch.Spans), len(batch.Spans)-len(td.Spans)) observability.RecordTraceReceiverMetrics(ctxWithReceiverName, len(batch.Spans), len(batch.Spans)-len(td.Spans))
} }
@ -279,7 +279,7 @@ func (jr *jReceiver) EmitBatch(batch *jaeger.Batch) error {
return err return err
} }
err = jr.nextProcessor.ProcessTraceData(jr.defaultAgentCtx, td) err = jr.nextConsumer.ConsumeTraceData(jr.defaultAgentCtx, td)
observability.RecordTraceReceiverMetrics(jr.defaultAgentCtx, len(batch.Spans), len(batch.Spans)-len(td.Spans)) observability.RecordTraceReceiverMetrics(jr.defaultAgentCtx, len(batch.Spans), len(batch.Spans)-len(td.Spans))
return err return err

View File

@ -27,24 +27,24 @@ import (
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/observability" "github.com/census-instrumentation/opencensus-service/observability"
"github.com/census-instrumentation/opencensus-service/processor"
) )
// Receiver is the type used to handle metrics from OpenCensus exporters. // Receiver is the type used to handle metrics from OpenCensus exporters.
type Receiver struct { type Receiver struct {
nextProcessor processor.MetricsDataProcessor nextConsumer consumer.MetricsConsumer
metricBufferPeriod time.Duration metricBufferPeriod time.Duration
metricBufferCount int metricBufferCount int
} }
// New creates a new ocmetrics.Receiver reference. // New creates a new ocmetrics.Receiver reference.
func New(nextProcessor processor.MetricsDataProcessor, opts ...Option) (*Receiver, error) { func New(nextConsumer consumer.MetricsConsumer, opts ...Option) (*Receiver, error) {
if nextProcessor == nil { if nextConsumer == nil {
return nil, errors.New("needs a non-nil processor.MetricsDataProcessor") return nil, errors.New("needs a non-nil consumer.MetricsConsumer")
} }
ocr := &Receiver{nextProcessor: nextProcessor} ocr := &Receiver{nextConsumer: nextConsumer}
for _, opt := range opts { for _, opt := range opts {
opt.WithReceiver(ocr) opt.WithReceiver(ocr)
} }
@ -142,7 +142,7 @@ func (ocr *Receiver) batchMetricExporting(longLivedRPCCtx context.Context, paylo
nMetrics := int64(0) nMetrics := int64(0)
for _, md := range mds { for _, md := range mds {
ocr.nextProcessor.ProcessMetricsData(ctx, *md) ocr.nextConsumer.ConsumeMetricsData(ctx, *md)
nMetrics += int64(len(md.Metrics)) nMetrics += int64(len(md.Metrics))
} }

View File

@ -34,10 +34,10 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/observability" "github.com/census-instrumentation/opencensus-service/observability"
"github.com/census-instrumentation/opencensus-service/processor"
) )
// TODO: add E2E tests once ocagent implements metric service client. // TODO: add E2E tests once ocagent implements metric service client.
@ -330,9 +330,9 @@ func newMetricAppender() *metricAppender {
return &metricAppender{metricsPerNode: make(map[*commonpb.Node][]*metricspb.Metric)} return &metricAppender{metricsPerNode: make(map[*commonpb.Node][]*metricspb.Metric)}
} }
var _ processor.MetricsDataProcessor = (*metricAppender)(nil) var _ consumer.MetricsConsumer = (*metricAppender)(nil)
func (sa *metricAppender) ProcessMetricsData(ctx context.Context, md data.MetricsData) error { func (sa *metricAppender) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error {
sa.Lock() sa.Lock()
defer sa.Unlock() defer sa.Unlock()
@ -341,7 +341,7 @@ func (sa *metricAppender) ProcessMetricsData(ctx context.Context, md data.Metric
return nil return nil
} }
func ocReceiverOnGRPCServer(t *testing.T, sr processor.MetricsDataProcessor, opts ...Option) (oci *Receiver, port int, done func()) { func ocReceiverOnGRPCServer(t *testing.T, sr consumer.MetricsConsumer, opts ...Option) (oci *Receiver, port int, done func()) {
ln, err := net.Listen("tcp", ":0") ln, err := net.Listen("tcp", ":0")
if err != nil { if err != nil {
t.Fatalf("Failed to find an available address to run the gRPC server: %v", err) t.Fatalf("Failed to find an available address to run the gRPC server: %v", err)

View File

@ -24,9 +24,9 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/observability" "github.com/census-instrumentation/opencensus-service/observability"
"github.com/census-instrumentation/opencensus-service/processor"
) )
const ( const (
@ -37,10 +37,10 @@ const (
// Receiver is the type used to handle spans from OpenCensus exporters. // Receiver is the type used to handle spans from OpenCensus exporters.
type Receiver struct { type Receiver struct {
nextProcessor processor.TraceDataProcessor nextConsumer consumer.TraceConsumer
numWorkers int numWorkers int
workers []*receiverWorker workers []*receiverWorker
messageChan chan *traceDataWithCtx messageChan chan *traceDataWithCtx
} }
type traceDataWithCtx struct { type traceDataWithCtx struct {
@ -49,16 +49,16 @@ type traceDataWithCtx struct {
} }
// New creates a new opencensus.Receiver reference. // New creates a new opencensus.Receiver reference.
func New(nextProcessor processor.TraceDataProcessor, opts ...Option) (*Receiver, error) { func New(nextConsumer consumer.TraceConsumer, opts ...Option) (*Receiver, error) {
if nextProcessor == nil { if nextConsumer == nil {
return nil, errors.New("needs a non-nil processor.TraceDataProcessor") return nil, errors.New("needs a non-nil consumer.TraceConsumer")
} }
messageChan := make(chan *traceDataWithCtx, messageChannelSize) messageChan := make(chan *traceDataWithCtx, messageChannelSize)
ocr := &Receiver{ ocr := &Receiver{
nextProcessor: nextProcessor, nextConsumer: nextConsumer,
numWorkers: defaultNumWorkers, numWorkers: defaultNumWorkers,
messageChan: messageChan, messageChan: messageChan,
} }
for _, opt := range opts { for _, opt := range opts {
opt(ocr) opt(ocr)
@ -199,7 +199,7 @@ func (rw *receiverWorker) export(longLivedCtx context.Context, tracedata *data.T
// If the starting RPC has a parent span, then add it as a parent link. // If the starting RPC has a parent span, then add it as a parent link.
observability.SetParentLink(longLivedCtx, span) observability.SetParentLink(longLivedCtx, span)
rw.receiver.nextProcessor.ProcessTraceData(ctx, *tracedata) rw.receiver.nextConsumer.ConsumeTraceData(ctx, *tracedata)
span.Annotate([]trace.Attribute{ span.Annotate([]trace.Attribute{
trace.Int64Attribute("num_spans", int64(len(tracedata.Spans))), trace.Int64Attribute("num_spans", int64(len(tracedata.Spans))),

View File

@ -36,10 +36,10 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/observability" "github.com/census-instrumentation/opencensus-service/observability"
"github.com/census-instrumentation/opencensus-service/processor"
"go.opencensus.io/trace" "go.opencensus.io/trace"
"go.opencensus.io/trace/tracestate" "go.opencensus.io/trace/tracestate"
) )
@ -464,9 +464,9 @@ func newSpanAppender() *spanAppender {
return &spanAppender{spansPerNode: make(map[*commonpb.Node][]*tracepb.Span)} return &spanAppender{spansPerNode: make(map[*commonpb.Node][]*tracepb.Span)}
} }
var _ processor.TraceDataProcessor = (*spanAppender)(nil) var _ consumer.TraceConsumer = (*spanAppender)(nil)
func (sa *spanAppender) ProcessTraceData(ctx context.Context, td data.TraceData) error { func (sa *spanAppender) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
sa.Lock() sa.Lock()
defer sa.Unlock() defer sa.Unlock()
@ -474,7 +474,7 @@ func (sa *spanAppender) ProcessTraceData(ctx context.Context, td data.TraceData)
return nil return nil
} }
func ocReceiverOnGRPCServer(t *testing.T, sr processor.TraceDataProcessor, opts ...Option) (oci *Receiver, port int, done func()) { func ocReceiverOnGRPCServer(t *testing.T, sr consumer.TraceConsumer, opts ...Option) (oci *Receiver, port int, done func()) {
ln, err := net.Listen("tcp", ":0") ln, err := net.Listen("tcp", ":0")
if err != nil { if err != nil {
t.Fatalf("Failed to find an available address to run the gRPC server: %v", err) t.Fatalf("Failed to find an available address to run the gRPC server: %v", err)

View File

@ -25,8 +25,8 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/observability" "github.com/census-instrumentation/opencensus-service/observability"
"github.com/census-instrumentation/opencensus-service/processor"
"github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver/ocmetrics" "github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver/ocmetrics"
"github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver/octrace" "github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver/octrace"
gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
@ -96,15 +96,15 @@ func (ocr *Receiver) TraceSource() string {
// StartTraceReception exclusively runs the Trace receiver on the gRPC server. // StartTraceReception exclusively runs the Trace receiver on the gRPC server.
// To start both Trace and Metrics receivers/services, please use Start. // To start both Trace and Metrics receivers/services, please use Start.
func (ocr *Receiver) StartTraceReception(ctx context.Context, ts processor.TraceDataProcessor) error { func (ocr *Receiver) StartTraceReception(ctx context.Context, ts consumer.TraceConsumer) error {
err := ocr.registerTraceDataProcessor(ts) err := ocr.registerTraceConsumer(ts)
if err != nil && err != errAlreadyStarted { if err != nil && err != errAlreadyStarted {
return err return err
} }
return ocr.startServer() return ocr.startServer()
} }
func (ocr *Receiver) registerTraceDataProcessor(ts processor.TraceDataProcessor) error { func (ocr *Receiver) registerTraceConsumer(ts consumer.TraceConsumer) error {
var err = errAlreadyStarted var err = errAlreadyStarted
ocr.startTraceReceiverOnce.Do(func() { ocr.startTraceReceiverOnce.Do(func() {
@ -125,15 +125,15 @@ func (ocr *Receiver) MetricsSource() string {
// StartMetricsReception exclusively runs the Metrics receiver on the gRPC server. // StartMetricsReception exclusively runs the Metrics receiver on the gRPC server.
// To start both Trace and Metrics receivers/services, please use Start. // To start both Trace and Metrics receivers/services, please use Start.
func (ocr *Receiver) StartMetricsReception(ctx context.Context, ms processor.MetricsDataProcessor) error { func (ocr *Receiver) StartMetricsReception(ctx context.Context, ms consumer.MetricsConsumer) error {
err := ocr.registerMetricsDataProcessor(ms) err := ocr.registerMetricsConsumer(ms)
if err != nil && err != errAlreadyStarted { if err != nil && err != errAlreadyStarted {
return err return err
} }
return ocr.startServer() return ocr.startServer()
} }
func (ocr *Receiver) registerMetricsDataProcessor(ms processor.MetricsDataProcessor) error { func (ocr *Receiver) registerMetricsConsumer(ms consumer.MetricsConsumer) error {
var err = errAlreadyStarted var err = errAlreadyStarted
ocr.startMetricsReceiverOnce.Do(func() { ocr.startMetricsReceiverOnce.Do(func() {
@ -179,11 +179,11 @@ func (ocr *Receiver) StopMetricsReception(ctx context.Context) error {
} }
// Start runs all the receivers/services namely, Trace and Metrics services. // Start runs all the receivers/services namely, Trace and Metrics services.
func (ocr *Receiver) Start(ctx context.Context, ts processor.TraceDataProcessor, ms processor.MetricsDataProcessor) error { func (ocr *Receiver) Start(ctx context.Context, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) error {
if err := ocr.registerTraceDataProcessor(ts); err != nil && err != errAlreadyStarted { if err := ocr.registerTraceConsumer(tc); err != nil && err != errAlreadyStarted {
return err return err
} }
if err := ocr.registerMetricsDataProcessor(ms); err != nil && err != errAlreadyStarted { if err := ocr.registerMetricsConsumer(mc); err != nil && err != errAlreadyStarted {
return err return err
} }

View File

@ -25,8 +25,8 @@ import (
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/processor"
"github.com/census-instrumentation/opencensus-service/receiver" "github.com/census-instrumentation/opencensus-service/receiver"
"github.com/orijtech/promreceiver" "github.com/orijtech/promreceiver"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
@ -97,15 +97,15 @@ func (pr *Preceiver) MetricsSource() string {
// StartMetricsReception is the method that starts Prometheus scraping and it // StartMetricsReception is the method that starts Prometheus scraping and it
// is controlled by having previously defined a Configuration using perhaps New. // is controlled by having previously defined a Configuration using perhaps New.
func (pr *Preceiver) StartMetricsReception(ctx context.Context, nextProcessor processor.MetricsDataProcessor) error { func (pr *Preceiver) StartMetricsReception(ctx context.Context, nextConsumer consumer.MetricsConsumer) error {
var err = errAlreadyStarted var err = errAlreadyStarted
pr.startOnce.Do(func() { pr.startOnce.Do(func() {
if nextProcessor == nil { if nextConsumer == nil {
err = errNilMetricsReceiverSink err = errNilMetricsReceiverSink
return return
} }
tms := &promMetricsReceiverToOpenCensusMetricsReceiver{nextProcessor: nextProcessor} tms := &promMetricsReceiverToOpenCensusMetricsReceiver{nextConsumer: nextConsumer}
cfg := pr.cfg cfg := pr.cfg
pr.recv, err = promreceiver.ReceiverFromConfig( pr.recv, err = promreceiver.ReceiverFromConfig(
context.Background(), context.Background(),
@ -118,7 +118,7 @@ func (pr *Preceiver) StartMetricsReception(ctx context.Context, nextProcessor pr
} }
// Flush triggers the Flush method on the underlying Prometheus scrapers and instructs // Flush triggers the Flush method on the underlying Prometheus scrapers and instructs
// them to immediately sned over the metrics they've collected, to the MetricsDataProcessor. // them to immediately sned over the metrics they've collected, to the MetricsConsumer.
func (pr *Preceiver) Flush() { func (pr *Preceiver) Flush() {
pr.recv.Flush() pr.recv.Flush()
} }
@ -130,7 +130,7 @@ func (pr *Preceiver) StopMetricsReception(ctx context.Context) error {
} }
type promMetricsReceiverToOpenCensusMetricsReceiver struct { type promMetricsReceiverToOpenCensusMetricsReceiver struct {
nextProcessor processor.MetricsDataProcessor nextConsumer consumer.MetricsConsumer
} }
var _ promreceiver.MetricsSink = (*promMetricsReceiverToOpenCensusMetricsReceiver)(nil) var _ promreceiver.MetricsSink = (*promMetricsReceiverToOpenCensusMetricsReceiver)(nil)
@ -143,7 +143,7 @@ func (pmrtomr *promMetricsReceiverToOpenCensusMetricsReceiver) ReceiveMetrics(ct
return errNilRequest return errNilRequest
} }
err := pmrtomr.nextProcessor.ProcessMetricsData(ctx, data.MetricsData{ err := pmrtomr.nextConsumer.ConsumeMetricsData(ctx, data.MetricsData{
Node: ereq.Node, Node: ereq.Node,
Resource: ereq.Resource, Resource: ereq.Resource,
Metrics: ereq.Metrics, Metrics: ereq.Metrics,

View File

@ -17,14 +17,14 @@ package receiver
import ( import (
"context" "context"
"github.com/census-instrumentation/opencensus-service/consumer"
_ "github.com/census-instrumentation/opencensus-service/internal/compression/grpc" // load in supported grpc compression encodings _ "github.com/census-instrumentation/opencensus-service/internal/compression/grpc" // load in supported grpc compression encodings
"github.com/census-instrumentation/opencensus-service/processor"
) )
// A TraceReceiver is an "arbitrary data"-to-"trace proto span" converter. // A TraceReceiver is an "arbitrary data"-to-"trace proto span" converter.
// Its purpose is to translate data from the wild into trace proto accompanied // Its purpose is to translate data from the wild into trace proto accompanied
// by a *commonpb.Node to uniquely identify where that data comes from. // by a *commonpb.Node to uniquely identify where that data comes from.
// TraceReceiver feeds a processor.TraceDataProcessor with data. // TraceReceiver feeds a consumer.TraceConsumer with data.
// //
// For example it could be Zipkin data source which translates // For example it could be Zipkin data source which translates
// Zipkin spans into *tracepb.Span-s. // Zipkin spans into *tracepb.Span-s.
@ -33,7 +33,7 @@ type TraceReceiver interface {
TraceSource() string TraceSource() string
// StartTraceReception tells the receiver to start its processing. // StartTraceReception tells the receiver to start its processing.
StartTraceReception(ctx context.Context, nextProcessor processor.TraceDataProcessor) error StartTraceReception(ctx context.Context, nextConsumer consumer.TraceConsumer) error
// StopTraceReception tells the receiver that should stop reception, // StopTraceReception tells the receiver that should stop reception,
// giving it a chance to perform any necessary clean-up. // giving it a chance to perform any necessary clean-up.
@ -43,7 +43,7 @@ type TraceReceiver interface {
// A MetricsReceiver is an "arbitrary data"-to-"metric proto" converter. // A MetricsReceiver is an "arbitrary data"-to-"metric proto" converter.
// Its purpose is to translate data from the wild into metric proto accompanied // Its purpose is to translate data from the wild into metric proto accompanied
// by a *commonpb.Node to uniquely identify where that data comes from. // by a *commonpb.Node to uniquely identify where that data comes from.
// MetricsReceiver feeds a processor.MetricsDataProcessor with data. // MetricsReceiver feeds a consumer.MetricsConsumer with data.
// //
// For example it could be Prometheus data source which translates // For example it could be Prometheus data source which translates
// Prometheus metrics into *metricpb.Metric-s. // Prometheus metrics into *metricpb.Metric-s.
@ -52,7 +52,7 @@ type MetricsReceiver interface {
MetricsSource() string MetricsSource() string
// StartMetricsReception tells the receiver to start its processing. // StartMetricsReception tells the receiver to start its processing.
StartMetricsReception(ctx context.Context, nextProcessor processor.MetricsDataProcessor) error StartMetricsReception(ctx context.Context, nextConsumer consumer.MetricsConsumer) error
// StopMetricsReception tells the receiver that should stop reception, // StopMetricsReception tells the receiver that should stop reception,
// giving it a chance to perform any necessary clean-up. // giving it a chance to perform any necessary clean-up.

View File

@ -26,8 +26,8 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
"github.com/omnition/scribe-go/if/scribe/gen-go/scribe" "github.com/omnition/scribe-go/if/scribe/gen-go/scribe"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/observability" "github.com/census-instrumentation/opencensus-service/observability"
"github.com/census-instrumentation/opencensus-service/processor"
"github.com/census-instrumentation/opencensus-service/receiver" "github.com/census-instrumentation/opencensus-service/receiver"
zipkintranslator "github.com/census-instrumentation/opencensus-service/translator/trace/zipkin" zipkintranslator "github.com/census-instrumentation/opencensus-service/translator/trace/zipkin"
) )
@ -73,18 +73,18 @@ func (r *scribeReceiver) TraceSource() string {
return traceSource return traceSource
} }
func (r *scribeReceiver) StartTraceReception(ctx context.Context, nextProcessor processor.TraceDataProcessor) error { func (r *scribeReceiver) StartTraceReception(ctx context.Context, nextConsumer consumer.TraceConsumer) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
if nextProcessor == nil { if nextConsumer == nil {
return errors.New("trace reception requires a non-nil destination") return errors.New("trace reception requires a non-nil destination")
} }
err := errAlreadyStarted err := errAlreadyStarted
r.startOnce.Do(func() { r.startOnce.Do(func() {
err = nil err = nil
r.collector.nextProcessor = nextProcessor r.collector.nextConsumer = nextConsumer
serverSocket, sockErr := thrift.NewTServerSocket(r.addr + ":" + strconv.Itoa(int(r.port))) serverSocket, sockErr := thrift.NewTServerSocket(r.addr + ":" + strconv.Itoa(int(r.port)))
if sockErr != nil { if sockErr != nil {
err = sockErr err = sockErr
@ -128,7 +128,7 @@ type scribeCollector struct {
category string category string
msgDecoder *base64.Encoding msgDecoder *base64.Encoding
tBinProtocolFactory *thrift.TBinaryProtocolFactory tBinProtocolFactory *thrift.TBinaryProtocolFactory
nextProcessor processor.TraceDataProcessor nextConsumer consumer.TraceConsumer
defaultCtx context.Context defaultCtx context.Context
} }
@ -174,7 +174,7 @@ func (sc *scribeCollector) Log(messages []*scribe.LogEntry) (r scribe.ResultCode
tdsSize := 0 tdsSize := 0
for _, td := range tds { for _, td := range tds {
sc.nextProcessor.ProcessTraceData(sc.defaultCtx, td) sc.nextConsumer.ConsumeTraceData(sc.defaultCtx, td)
tdsSize += len(td.Spans) tdsSize += len(td.Spans)
} }

View File

@ -29,8 +29,8 @@ import (
"github.com/golang/protobuf/ptypes/timestamp" "github.com/golang/protobuf/ptypes/timestamp"
"github.com/omnition/scribe-go/if/scribe/gen-go/scribe" "github.com/omnition/scribe-go/if/scribe/gen-go/scribe"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/processor"
) )
func TestNonEqualCategoryIsIgnored(t *testing.T) { func TestNonEqualCategoryIsIgnored(t *testing.T) {
@ -154,9 +154,9 @@ func newMockTraceSink(numReceiveTraceDataCount int) *mockTraceSink {
} }
} }
var _ processor.TraceDataProcessor = (*mockTraceSink)(nil) var _ consumer.TraceConsumer = (*mockTraceSink)(nil)
func (m *mockTraceSink) ProcessTraceData(ctx context.Context, td data.TraceData) error { func (m *mockTraceSink) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
m.receivedData = append(m.receivedData, td) m.receivedData = append(m.receivedData, td)
m.wg.Done() m.wg.Done()
return nil return nil

View File

@ -37,10 +37,10 @@ import (
zipkinproto "github.com/openzipkin/zipkin-go/proto/v2" zipkinproto "github.com/openzipkin/zipkin-go/proto/v2"
"go.opencensus.io/trace" "go.opencensus.io/trace"
"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/observability" "github.com/census-instrumentation/opencensus-service/observability"
"github.com/census-instrumentation/opencensus-service/processor"
"github.com/census-instrumentation/opencensus-service/receiver" "github.com/census-instrumentation/opencensus-service/receiver"
tracetranslator "github.com/census-instrumentation/opencensus-service/translator/trace" tracetranslator "github.com/census-instrumentation/opencensus-service/translator/trace"
zipkintranslator "github.com/census-instrumentation/opencensus-service/translator/trace/zipkin" zipkintranslator "github.com/census-instrumentation/opencensus-service/translator/trace/zipkin"
@ -54,7 +54,7 @@ type ZipkinReceiver struct {
// addr is the address onto which the HTTP server will be bound // addr is the address onto which the HTTP server will be bound
addr string addr string
nextProcessor processor.TraceDataProcessor nextConsumer consumer.TraceConsumer
startOnce sync.Once startOnce sync.Once
stopOnce sync.Once stopOnce sync.Once
@ -93,7 +93,7 @@ func (zr *ZipkinReceiver) TraceSource() string {
} }
// StartTraceReception spins up the receiver's HTTP server and makes the receiver start its processing. // StartTraceReception spins up the receiver's HTTP server and makes the receiver start its processing.
func (zr *ZipkinReceiver) StartTraceReception(ctx context.Context, nextProcessor processor.TraceDataProcessor) error { func (zr *ZipkinReceiver) StartTraceReception(ctx context.Context, nextConsumer consumer.TraceConsumer) error {
zr.mu.Lock() zr.mu.Lock()
defer zr.mu.Unlock() defer zr.mu.Unlock()
@ -111,7 +111,7 @@ func (zr *ZipkinReceiver) StartTraceReception(ctx context.Context, nextProcessor
_ = server.Serve(ln) _ = server.Serve(ln)
}() }()
zr.nextProcessor = nextProcessor zr.nextConsumer = nextConsumer
zr.server = server zr.server = server
err = nil err = nil
@ -278,7 +278,7 @@ const (
) )
// The ZipkinReceiver receives spans from endpoint /api/v2 as JSON, // The ZipkinReceiver receives spans from endpoint /api/v2 as JSON,
// unmarshals them and sends them along to the nextProcessor. // unmarshals them and sends them along to the nextConsumer.
func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Trace this method // Trace this method
ctx, span := trace.StartSpan(context.Background(), "ZipkinReceiver.Export") ctx, span := trace.StartSpan(context.Background(), "ZipkinReceiver.Export")
@ -322,7 +322,7 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctxWithReceiverName := observability.ContextWithReceiverName(ctx, receiverTagValue) ctxWithReceiverName := observability.ContextWithReceiverName(ctx, receiverTagValue)
tdsSize := 0 tdsSize := 0
for _, td := range tds { for _, td := range tds {
zr.nextProcessor.ProcessTraceData(ctxWithReceiverName, td) zr.nextConsumer.ConsumeTraceData(ctxWithReceiverName, td)
tdsSize += len(td.Spans) tdsSize += len(td.Spans)
} }

View File

@ -225,7 +225,7 @@ func TestConversionRoundtrip(t *testing.T) {
} }
}]`) }]`)
zi := &ZipkinReceiver{nextProcessor: exportertest.NewNopTraceExporter()} zi := &ZipkinReceiver{nextConsumer: exportertest.NewNopTraceExporter()}
ereqs, err := zi.v2ToTraceSpans(receiverInputJSON, nil) ereqs, err := zi.v2ToTraceSpans(receiverInputJSON, nil)
if err != nil { if err != nil {
t.Fatalf("Failed to parse and convert receiver JSON: %v", err) t.Fatalf("Failed to parse and convert receiver JSON: %v", err)