diff --git a/api/global/internal/meter_test.go b/api/global/internal/meter_test.go index 13e1f04f5..a156111c8 100644 --- a/api/global/internal/meter_test.go +++ b/api/global/internal/meter_test.go @@ -5,7 +5,6 @@ import ( "io" "io/ioutil" "testing" - "time" "github.com/stretchr/testify/require" @@ -15,9 +14,6 @@ import ( "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/exporter/metric/stdout" metrictest "go.opentelemetry.io/otel/internal/metric" - "go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped" - "go.opentelemetry.io/otel/sdk/metric/controller/push" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) func TestDirect(t *testing.T) { @@ -207,26 +203,13 @@ func TestDefaultSDK(t *testing.T) { counter.Add(ctx, 1, labels1) in, out := io.Pipe() - // TODO this should equal a stdout.NewPipeline(), use it. - // Consider also moving the io.Pipe() and go func() call - // below into a test helper somewhere. - sdk := func(options stdout.Options) *push.Controller { - selector := simple.NewWithInexpensiveMeasure() - exporter, err := stdout.New(options) - if err != nil { - panic(err) - } - batcher := ungrouped.New(selector, true) - pusher := push.New(batcher, exporter, time.Second) - pusher.Start() - - return pusher - }(stdout.Options{ + pusher, err := stdout.InstallNewPipeline(stdout.Options{ Writer: out, DoNotPrintTime: true, }) - - global.SetMeterProvider(sdk) + if err != nil { + panic(err) + } counter.Add(ctx, 1, labels1) @@ -236,9 +219,9 @@ func TestDefaultSDK(t *testing.T) { ch <- string(data) }() - sdk.Stop() + pusher.Stop() out.Close() - require.Equal(t, `{"updates":[{"name":"test.builtin{A=B}","sum":1}]} + require.Equal(t, `{"updates":[{"name":"test.builtin","sum":1}]} `, <-ch) } diff --git a/example/basic/main.go b/example/basic/main.go index 0a69cdfa6..ccd6c0613 100644 --- a/example/basic/main.go +++ b/example/basic/main.go @@ -17,7 +17,6 @@ package main import ( "context" "log" - "time" "go.opentelemetry.io/otel/api/distributedcontext" "go.opentelemetry.io/otel/api/global" @@ -26,10 +25,7 @@ import ( "go.opentelemetry.io/otel/api/trace" metricstdout "go.opentelemetry.io/otel/exporter/metric/stdout" tracestdout "go.opentelemetry.io/otel/exporter/trace/stdout" - metricsdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys" "go.opentelemetry.io/otel/sdk/metric/controller/push" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) @@ -57,19 +53,13 @@ func initTracer() { } func initMeter() *push.Controller { - selector := simple.NewWithExactMeasure() - exporter, err := metricstdout.New(metricstdout.Options{ + pusher, err := metricstdout.InstallNewPipeline(metricstdout.Options{ Quantiles: []float64{0.5, 0.9, 0.99}, PrettyPrint: false, }) if err != nil { log.Panicf("failed to initialize metric stdout exporter %v", err) } - batcher := defaultkeys.New(selector, metricsdk.NewDefaultLabelEncoder(), true) - pusher := push.New(batcher, exporter, time.Second) - pusher.Start() - - global.SetMeterProvider(pusher) return pusher } diff --git a/example/prometheus/main.go b/example/prometheus/main.go index d625f0a26..e5f5becf1 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -24,10 +24,7 @@ import ( "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporter/metric/prometheus" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys" "go.opentelemetry.io/otel/sdk/metric/controller/push" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) var ( @@ -37,29 +34,15 @@ var ( ) func initMeter() *push.Controller { - selector := simple.NewWithExactMeasure() - exporter, err := prometheus.NewExporter(prometheus.Options{}) - + pusher, hf, err := prometheus.InstallNewPipeline(prometheus.Options{}) if err != nil { - log.Panicf("failed to initialize metric stdout exporter %v", err) + log.Panicf("failed to initialize prometheus exporter %v", err) } - // Prometheus needs to use a stateful batcher since counters (and histogram since they are a collection of Counters) - // are cumulative (i.e., monotonically increasing values) and should not be resetted after each export. - // - // Prometheus uses this approach to be resilient to scrape failures. - // If a Prometheus server tries to scrape metrics from a host and fails for some reason, - // it could try again on the next scrape and no data would be lost, only resolution. - // - // Gauges (or LastValues) and Summaries are an exception to this and have different behaviors. - batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), true) - pusher := push.New(batcher, exporter, time.Second) - pusher.Start() - + http.HandleFunc("/", hf) go func() { - _ = http.ListenAndServe(":2222", exporter) + _ = http.ListenAndServe(":2222", nil) }() - global.SetMeterProvider(pusher) return pusher } diff --git a/exporter/metric/dogstatsd/dogstatsd.go b/exporter/metric/dogstatsd/dogstatsd.go index 01c71a659..b61c69303 100644 --- a/exporter/metric/dogstatsd/dogstatsd.go +++ b/exporter/metric/dogstatsd/dogstatsd.go @@ -16,13 +16,19 @@ package dogstatsd // import "go.opentelemetry.io/otel/exporter/metric/dogstatsd" import ( "bytes" + "time" + "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/exporter/metric/internal/statsd" + export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped" + "go.opentelemetry.io/otel/sdk/metric/controller/push" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) type ( - Config = statsd.Config + Options = statsd.Options // Exporter implements a dogstatsd-format statsd exporter, // which encodes label sets as independent fields in the @@ -45,20 +51,59 @@ var ( _ export.LabelEncoder = &Exporter{} ) -// New returns a new Dogstatsd-syntax exporter. This type implements -// the metric.LabelEncoder interface, allowing the SDK's unique label -// encoding to be pre-computed for the exporter and stored in the -// LabelSet. -func New(config Config) (*Exporter, error) { +// NewRawExporter returns a new Dogstatsd-syntax exporter for use in a pipeline. +// This type implements the metric.LabelEncoder interface, +// allowing the SDK's unique label encoding to be pre-computed +// for the exporter and stored in the LabelSet. +func NewRawExporter(options Options) (*Exporter, error) { exp := &Exporter{ LabelEncoder: statsd.NewLabelEncoder(), } var err error - exp.Exporter, err = statsd.NewExporter(config, exp) + exp.Exporter, err = statsd.NewExporter(options, exp) return exp, err } +// InstallNewPipeline instantiates a NewExportPipeline and registers it globally. +// Typically called as: +// pipeline, err := dogstatsd.InstallNewPipeline(dogstatsd.Options{...}) +// if err != nil { +// ... +// } +// defer pipeline.Stop() +// ... Done +func InstallNewPipeline(options Options) (*push.Controller, error) { + controller, err := NewExportPipeline(options) + if err != nil { + return controller, err + } + global.SetMeterProvider(controller) + return controller, err +} + +// NewExportPipeline sets up a complete export pipeline with the recommended setup, +// chaining a NewRawExporter into the recommended selectors and batchers. +func NewExportPipeline(options Options) (*push.Controller, error) { + selector := simple.NewWithExactMeasure() + exporter, err := NewRawExporter(options) + if err != nil { + return nil, err + } + + // The ungrouped batcher ensures that the export sees the full + // set of labels as dogstatsd tags. + batcher := ungrouped.New(selector, false) + + // The pusher automatically recognizes that the exporter + // implements the LabelEncoder interface, which ensures the + // export encoding for labels is encoded in the LabelSet. + pusher := push.New(batcher, exporter, time.Hour) + pusher.Start() + + return pusher, nil +} + // AppendName is part of the stats-internal adapter interface. func (*Exporter) AppendName(rec export.Record, buf *bytes.Buffer) { _, _ = buf.WriteString(rec.Descriptor().Name()) diff --git a/exporter/metric/dogstatsd/dogstatsd_test.go b/exporter/metric/dogstatsd/dogstatsd_test.go index c4fc8609d..b090b3dff 100644 --- a/exporter/metric/dogstatsd/dogstatsd_test.go +++ b/exporter/metric/dogstatsd/dogstatsd_test.go @@ -52,7 +52,7 @@ func TestDogstatsLabels(t *testing.T) { checkpointSet.Add(desc, cagg, key.New("A").String("B")) var buf bytes.Buffer - exp, err := dogstatsd.New(dogstatsd.Config{ + exp, err := dogstatsd.NewRawExporter(dogstatsd.Options{ Writer: &buf, }) require.Nil(t, err) diff --git a/exporter/metric/dogstatsd/example_test.go b/exporter/metric/dogstatsd/example_test.go index d5a031757..47ceceaf4 100644 --- a/exporter/metric/dogstatsd/example_test.go +++ b/exporter/metric/dogstatsd/example_test.go @@ -6,14 +6,10 @@ import ( "io" "log" "sync" - "time" "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporter/metric/dogstatsd" - "go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped" - "go.opentelemetry.io/otel/sdk/metric/controller/push" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) func ExampleNew() { @@ -42,8 +38,7 @@ func ExampleNew() { }() // Create a meter - selector := simple.NewWithExactMeasure() - exporter, err := dogstatsd.New(dogstatsd.Config{ + pusher, err := dogstatsd.NewExportPipeline(dogstatsd.Options{ // The Writer field provides test support. Writer: writer, @@ -54,15 +49,6 @@ func ExampleNew() { if err != nil { log.Fatal("Could not initialize dogstatsd exporter:", err) } - // The ungrouped batcher ensures that the export sees the full - // set of labels as dogstatsd tags. - batcher := ungrouped.New(selector, false) - - // The pusher automatically recognizes that the exporter - // implements the LabelEncoder interface, which ensures the - // export encoding for labels is encoded in the LabelSet. - pusher := push.New(batcher, exporter, time.Hour) - pusher.Start() ctx := context.Background() diff --git a/exporter/metric/internal/statsd/conn.go b/exporter/metric/internal/statsd/conn.go index 186d8c8e3..9998c93f9 100644 --- a/exporter/metric/internal/statsd/conn.go +++ b/exporter/metric/internal/statsd/conn.go @@ -34,8 +34,8 @@ import ( ) type ( - // Config supports common options that apply to statsd exporters. - Config struct { + // Options supports common options that apply to statsd exporters. + Options struct { // URL describes the destination for exporting statsd data. // e.g., udp://host:port // tcp://host:port @@ -57,7 +57,7 @@ type ( // exporters. Exporter struct { adapter Adapter - config Config + options Options conn net.Conn writer io.Writer buffer bytes.Buffer @@ -88,17 +88,17 @@ var ( // NewExport returns a common implementation for exporters that Export // statsd syntax. -func NewExporter(config Config, adapter Adapter) (*Exporter, error) { - if config.MaxPacketSize <= 0 { - config.MaxPacketSize = MaxPacketSize +func NewExporter(options Options, adapter Adapter) (*Exporter, error) { + if options.MaxPacketSize <= 0 { + options.MaxPacketSize = MaxPacketSize } var writer io.Writer var conn net.Conn var err error - if config.Writer != nil { - writer = config.Writer + if options.Writer != nil { + writer = options.Writer } else { - conn, err = dial(config.URL) + conn, err = dial(options.URL) if conn != nil { writer = conn } @@ -108,7 +108,7 @@ func NewExporter(config Config, adapter Adapter) (*Exporter, error) { // Start() and Stop() API. return &Exporter{ adapter: adapter, - config: config, + options: options, conn: conn, writer: writer, }, err @@ -171,7 +171,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) return } - if buf.Len() < e.config.MaxPacketSize { + if buf.Len() < e.options.MaxPacketSize { return } if before == 0 { diff --git a/exporter/metric/internal/statsd/conn_test.go b/exporter/metric/internal/statsd/conn_test.go index eb0cd525a..537e3b14d 100644 --- a/exporter/metric/internal/statsd/conn_test.go +++ b/exporter/metric/internal/statsd/conn_test.go @@ -113,11 +113,11 @@ timer.B.D:%s|ms t.Run(nkind.String(), func(t *testing.T) { ctx := context.Background() writer := &testWriter{} - config := statsd.Config{ + options := statsd.Options{ Writer: writer, MaxPacketSize: 1024, } - exp, err := statsd.NewExporter(config, adapter) + exp, err := statsd.NewExporter(options, adapter) if err != nil { t.Fatal("New error: ", err) } @@ -274,12 +274,12 @@ func TestPacketSplit(t *testing.T) { t.Run(tcase.name, func(t *testing.T) { ctx := context.Background() writer := &testWriter{} - config := statsd.Config{ + options := statsd.Options{ Writer: writer, MaxPacketSize: 1024, } adapter := newWithTagsAdapter() - exp, err := statsd.NewExporter(config, adapter) + exp, err := statsd.NewExporter(options, adapter) if err != nil { t.Fatal("New error: ", err) } diff --git a/exporter/metric/prometheus/prometheus.go b/exporter/metric/prometheus/prometheus.go index 05fbd3981..90cc7bc1f 100644 --- a/exporter/metric/prometheus/prometheus.go +++ b/exporter/metric/prometheus/prometheus.go @@ -18,13 +18,19 @@ import ( "context" "fmt" "net/http" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/api/global" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys" + "go.opentelemetry.io/otel/sdk/metric/controller/push" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) // Exporter is an implementation of metric.Exporter that sends metrics to @@ -73,8 +79,9 @@ type Options struct { OnError func(error) } -// NewExporter returns a new prometheus exporter for prometheus metrics. -func NewExporter(opts Options) (*Exporter, error) { +// NewRawExporter returns a new prometheus exporter for prometheus metrics +// for use in a pipeline. +func NewRawExporter(opts Options) (*Exporter, error) { if opts.Registry == nil { opts.Registry = prometheus.NewRegistry() } @@ -108,6 +115,48 @@ func NewExporter(opts Options) (*Exporter, error) { return e, nil } +// InstallNewPipeline instantiates a NewExportPipeline and registers it globally. +// Typically called as: +// pipeline, hf, err := prometheus.InstallNewPipeline(prometheus.Options{...}) +// if err != nil { +// ... +// } +// http.HandleFunc("/metrics", hf) +// defer pipeline.Stop() +// ... Done +func InstallNewPipeline(options Options) (*push.Controller, http.HandlerFunc, error) { + controller, hf, err := NewExportPipeline(options) + if err != nil { + return controller, hf, err + } + global.SetMeterProvider(controller) + return controller, hf, err +} + +// NewExportPipeline sets up a complete export pipeline with the recommended setup, +// chaining a NewRawExporter into the recommended selectors and batchers. +func NewExportPipeline(options Options) (*push.Controller, http.HandlerFunc, error) { + selector := simple.NewWithExactMeasure() + exporter, err := NewRawExporter(options) + if err != nil { + return nil, nil, err + } + + // Prometheus needs to use a stateful batcher since counters (and histogram since they are a collection of Counters) + // are cumulative (i.e., monotonically increasing values) and should not be resetted after each export. + // + // Prometheus uses this approach to be resilient to scrape failures. + // If a Prometheus server tries to scrape metrics from a host and fails for some reason, + // it could try again on the next scrape and no data would be lost, only resolution. + // + // Gauges (or LastValues) and Summaries are an exception to this and have different behaviors. + batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), false) + pusher := push.New(batcher, exporter, time.Second) + pusher.Start() + + return pusher, exporter.ServeHTTP, nil +} + // Export exports the provide metric record to prometheus. func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { e.snapshot = checkpointSet diff --git a/exporter/metric/prometheus/prometheus_test.go b/exporter/metric/prometheus/prometheus_test.go index 2df58d3f2..602731ee4 100644 --- a/exporter/metric/prometheus/prometheus_test.go +++ b/exporter/metric/prometheus/prometheus_test.go @@ -19,11 +19,11 @@ import ( ) func TestPrometheusExporter(t *testing.T) { - exporter, err := prometheus.NewExporter(prometheus.Options{ + exporter, err := prometheus.NewRawExporter(prometheus.Options{ DefaultSummaryQuantiles: []float64{0.5, 0.9, 0.99}, }) if err != nil { - log.Panicf("failed to initialize metric stdout exporter %v", err) + log.Panicf("failed to initialize prometheus exporter %v", err) } var expected []string diff --git a/exporter/metric/stdout/example_test.go b/exporter/metric/stdout/example_test.go new file mode 100644 index 000000000..ecaaee73b --- /dev/null +++ b/exporter/metric/stdout/example_test.go @@ -0,0 +1,43 @@ +package stdout_test + +import ( + "context" + "log" + + "go.opentelemetry.io/otel/api/key" + "go.opentelemetry.io/otel/api/metric" + "go.opentelemetry.io/otel/exporter/metric/stdout" +) + +func ExampleNewExportPipeline() { + // Create a meter + pusher, err := stdout.NewExportPipeline(stdout.Options{ + PrettyPrint: true, + DoNotPrintTime: true, + }) + if err != nil { + log.Fatal("Could not initialize stdout exporter:", err) + } + defer pusher.Stop() + + ctx := context.Background() + + key := key.New("key") + meter := pusher.Meter("example") + + // Create and update a single counter: + counter := meter.NewInt64Counter("a.counter", metric.WithKeys(key)) + labels := meter.Labels(key.String("value")) + + counter.Add(ctx, 100, labels) + + // Output: + // { + // "updates": [ + // { + // "name": "a.counter{key=value}", + // "sum": 100 + // } + // ] + // } +} diff --git a/exporter/metric/stdout/stdout.go b/exporter/metric/stdout/stdout.go index 357c48e30..332fc7b3a 100644 --- a/exporter/metric/stdout/stdout.go +++ b/exporter/metric/stdout/stdout.go @@ -23,8 +23,14 @@ import ( "strings" "time" + "go.opentelemetry.io/otel/api/global" + export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" + metricsdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys" + "go.opentelemetry.io/otel/sdk/metric/controller/push" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) type Exporter struct { @@ -80,7 +86,8 @@ type expoQuantile struct { V interface{} `json:"v"` } -func New(options Options) (*Exporter, error) { +// NewRawExporter creates a stdout Exporter for use in a pipeline. +func NewRawExporter(options Options) (*Exporter, error) { if options.Writer == nil { options.Writer = os.Stdout } @@ -98,6 +105,38 @@ func New(options Options) (*Exporter, error) { }, nil } +// InstallNewPipeline instantiates a NewExportPipeline and registers it globally. +// Typically called as: +// pipeline, err := stdout.InstallNewPipeline(stdout.Options{...}) +// if err != nil { +// ... +// } +// defer pipeline.Stop() +// ... Done +func InstallNewPipeline(options Options) (*push.Controller, error) { + controller, err := NewExportPipeline(options) + if err != nil { + return controller, err + } + global.SetMeterProvider(controller) + return controller, err +} + +// NewExportPipeline sets up a complete export pipeline with the recommended setup, +// chaining a NewRawExporter into the recommended selectors and batchers. +func NewExportPipeline(options Options) (*push.Controller, error) { + selector := simple.NewWithExactMeasure() + exporter, err := NewRawExporter(options) + if err != nil { + return nil, err + } + batcher := defaultkeys.New(selector, metricsdk.NewDefaultLabelEncoder(), true) + pusher := push.New(batcher, exporter, time.Second) + pusher.Start() + + return pusher, nil +} + func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { // N.B. Only return one aggError, if any occur. They're likely // to be duplicates of the same error. diff --git a/exporter/metric/stdout/stdout_test.go b/exporter/metric/stdout/stdout_test.go index 1f68c6154..7ae6d4dd4 100644 --- a/exporter/metric/stdout/stdout_test.go +++ b/exporter/metric/stdout/stdout_test.go @@ -36,7 +36,7 @@ func newFixture(t *testing.T, options stdout.Options) testFixture { buf := &bytes.Buffer{} options.Writer = buf options.DoNotPrintTime = true - exp, err := stdout.New(options) + exp, err := stdout.NewRawExporter(options) if err != nil { t.Fatal("Error building fixture: ", err) } @@ -60,7 +60,7 @@ func (fix testFixture) Export(checkpointSet export.CheckpointSet) { } func TestStdoutInvalidQuantile(t *testing.T) { - _, err := stdout.New(stdout.Options{ + _, err := stdout.NewRawExporter(stdout.Options{ Quantiles: []float64{1.1, 0.9}, }) require.Error(t, err, "Invalid quantile error expected") @@ -69,7 +69,7 @@ func TestStdoutInvalidQuantile(t *testing.T) { func TestStdoutTimestamp(t *testing.T) { var buf bytes.Buffer - exporter, err := stdout.New(stdout.Options{ + exporter, err := stdout.NewRawExporter(stdout.Options{ Writer: &buf, DoNotPrintTime: false, }) diff --git a/sdk/metric/example_test.go b/sdk/metric/example_test.go index 84fb715bd..112204dc2 100644 --- a/sdk/metric/example_test.go +++ b/sdk/metric/example_test.go @@ -17,29 +17,20 @@ package metric_test import ( "context" "fmt" - "time" "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporter/metric/stdout" - sdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys" - "go.opentelemetry.io/otel/sdk/metric/controller/push" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) func ExampleNew() { - selector := simple.NewWithInexpensiveMeasure() - exporter, err := stdout.New(stdout.Options{ + pusher, err := stdout.NewExportPipeline(stdout.Options{ PrettyPrint: true, DoNotPrintTime: true, // This makes the output deterministic }) if err != nil { panic(fmt.Sprintln("Could not initialize stdout exporter:", err)) } - batcher := defaultkeys.New(selector, sdk.NewDefaultLabelEncoder(), true) - pusher := push.New(batcher, exporter, time.Second) - pusher.Start() defer pusher.Stop() ctx := context.Background()