diff --git a/api/global/internal/benchmark_test.go b/api/global/internal/benchmark_test.go index 5f5fb702c..107461cc1 100644 --- a/api/global/internal/benchmark_test.go +++ b/api/global/internal/benchmark_test.go @@ -25,7 +25,7 @@ import ( "go.opentelemetry.io/otel/api/trace" export "go.opentelemetry.io/otel/sdk/export/metric" sdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/integrator/test" + "go.opentelemetry.io/otel/sdk/metric/processor/test" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index 82e64cf66..a85ffa97b 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -84,7 +84,7 @@ type Config struct { } // NewExportPipeline sets up a complete export pipeline with the recommended setup, -// using the recommended selector and standard integrator. See the pull.Options. +// using the recommended selector and standard processor. See the pull.Options. func NewExportPipeline(config Config, options ...pull.Option) (*Exporter, error) { if config.Registry == nil { config.Registry = prometheus.NewRegistry() diff --git a/exporters/metric/stdout/stdout.go b/exporters/metric/stdout/stdout.go index 0c3d2c242..4ffadb021 100644 --- a/exporters/metric/stdout/stdout.go +++ b/exporters/metric/stdout/stdout.go @@ -131,7 +131,7 @@ func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller // NewExportPipeline sets up a complete export pipeline with the // recommended setup, chaining a NewRawExporter into the recommended -// selectors and integrators. +// selectors and processors. func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, error) { exporter, err := NewRawExporter(config) if err != nil { diff --git a/exporters/metric/test/test.go b/exporters/metric/test/test.go index c2a7f47c8..59e7ba8a8 100644 --- a/exporters/metric/test/test.go +++ b/exporters/metric/test/test.go @@ -108,7 +108,7 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l return newAgg, true } -// ForEach does not use ExportKindSelected: use a real Integrator to +// ForEach does not use ExportKindSelected: use a real Processor to // test ExportKind functionality. func (p *CheckpointSet) ForEach(_ export.ExportKindSelector, f func(export.Record) error) error { for _, r := range p.updates { diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go index d0b10bdfc..84edf1b56 100644 --- a/exporters/otlp/otlp_integration_test.go +++ b/exporters/otlp/otlp_integration_test.go @@ -34,7 +34,7 @@ import ( metricsdk "go.opentelemetry.io/otel/sdk/export/metric" exporttrace "go.opentelemetry.io/otel/sdk/export/trace" "go.opentelemetry.io/otel/sdk/metric/controller/push" - integrator "go.opentelemetry.io/otel/sdk/metric/integrator/basic" + processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" "go.opentelemetry.io/otel/sdk/metric/selector/simple" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -117,8 +117,8 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) } selector := simple.NewWithExactDistribution() - integrator := integrator.New(selector, metricsdk.PassThroughExporter) - pusher := push.New(integrator, exp) + processor := processor.New(selector, metricsdk.PassThroughExporter) + pusher := push.New(processor, exp) pusher.Start() ctx := context.Background() diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 72e9a585c..68dd9cad9 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -27,7 +27,7 @@ import ( "go.opentelemetry.io/otel/sdk/resource" ) -// Integrator is responsible for deciding which kind of aggregation to +// Processor is responsible for deciding which kind of aggregation to // use (via AggregatorSelector), gathering exported results from the // SDK during collection, and deciding over which dimensions to group // the exported data. @@ -42,9 +42,9 @@ import ( // // The `Process` method is called during collection in a // single-threaded context from the SDK, after the aggregator is -// checkpointed, allowing the integrator to build the set of metrics +// checkpointed, allowing the processor to build the set of metrics // currently being exported. -type Integrator interface { +type Processor interface { // AggregatorSelector is responsible for selecting the // concrete type of Aggregator used for a metric in the SDK. // @@ -177,18 +177,18 @@ type Exporter interface { // The Context comes from the controller that initiated // collection. // - // The CheckpointSet interface refers to the Integrator that just + // The CheckpointSet interface refers to the Processor that just // completed collection. Export(context.Context, CheckpointSet) error - // ExportKindSelector is an interface used by the Integrator + // ExportKindSelector is an interface used by the Processor // in deciding whether to compute Delta or Cumulative // Aggregations when passing Records to this Exporter. ExportKindSelector } // ExportKindSelector is a sub-interface of Exporter used to indicate -// whether the Integrator should compute Delta or Cumulative +// whether the Processor should compute Delta or Cumulative // Aggregations. type ExportKindSelector interface { // ExportKindFor should return the correct ExportKind that @@ -198,7 +198,7 @@ type ExportKindSelector interface { } // CheckpointSet allows a controller to access a complete checkpoint of -// aggregated metrics from the Integrator. This is passed to the +// aggregated metrics from the Processor. This is passed to the // Exporter which may then use ForEach to iterate over the collection // of aggregated metrics. type CheckpointSet interface { @@ -219,9 +219,9 @@ type CheckpointSet interface { // Locker supports locking the checkpoint set. Collection // into the checkpoint set cannot take place (in case of a - // stateful integrator) while it is locked. + // stateful processor) while it is locked. // - // The Integrator attached to the Accumulator MUST be called + // The Processor attached to the Accumulator MUST be called // with the lock held. sync.Locker @@ -232,7 +232,7 @@ type CheckpointSet interface { } // Metadata contains the common elements for exported metric data that -// are shared by the Accumulator->Integrator and Integrator->Exporter +// are shared by the Accumulator->Processor and Processor->Exporter // steps. type Metadata struct { descriptor *metric.Descriptor @@ -241,14 +241,14 @@ type Metadata struct { } // Accumulation contains the exported data for a single metric instrument -// and label set, as prepared by an Accumulator for the Integrator. +// and label set, as prepared by an Accumulator for the Processor. type Accumulation struct { Metadata aggregator Aggregator } // Record contains the exported data for a single metric instrument -// and label set, as prepared by the Integrator for the Exporter. +// and label set, as prepared by the Processor for the Exporter. // This includes the effective start and end time for the aggregation. type Record struct { Metadata @@ -274,7 +274,7 @@ func (m Metadata) Resource() *resource.Resource { } // NewAccumulation allows Accumulator implementations to construct new -// Accumulations to send to Integrators. The Descriptor, Labels, Resource, +// Accumulations to send to Processors. The Descriptor, Labels, Resource, // and Aggregator represent aggregate metric events received over a single // collection period. func NewAccumulation(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregator Aggregator) Accumulation { @@ -294,7 +294,7 @@ func (r Accumulation) Aggregator() Aggregator { return r.aggregator } -// NewRecord allows Integrator implementations to construct export +// NewRecord allows Processor implementations to construct export // records. The Descriptor, Labels, and Aggregator represent // aggregate metric events received over a single collection period. func NewRecord(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregation aggregation.Aggregation, start, end time.Time) Record { diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 12da6906b..33d0f4953 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -25,7 +25,7 @@ import ( "go.opentelemetry.io/otel/api/metric" export "go.opentelemetry.io/otel/sdk/export/metric" sdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/integrator/test" + "go.opentelemetry.io/otel/sdk/metric/processor/test" ) type benchFixture struct { diff --git a/sdk/metric/controller/pull/pull.go b/sdk/metric/controller/pull/pull.go index beda5f4eb..bf3620ba6 100644 --- a/sdk/metric/controller/pull/pull.go +++ b/sdk/metric/controller/pull/pull.go @@ -23,7 +23,7 @@ import ( export "go.opentelemetry.io/otel/sdk/export/metric" sdk "go.opentelemetry.io/otel/sdk/metric" controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time" - integrator "go.opentelemetry.io/otel/sdk/metric/integrator/basic" + processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" "go.opentelemetry.io/otel/sdk/resource" ) @@ -32,11 +32,11 @@ import ( const DefaultCachePeriod time.Duration = 10 * time.Second // Controller manages access to a *sdk.Accumulator and -// *basic.Integrator. Use Provider() for obtaining Meters. Use +// *basic.Processor. Use Provider() for obtaining Meters. Use // Foreach() for accessing current records. type Controller struct { accumulator *sdk.Accumulator - integrator *integrator.Integrator + processor *processor.Processor provider *registry.Provider period time.Duration lastCollect time.Time @@ -53,25 +53,25 @@ func New(aselector export.AggregatorSelector, eselector export.ExportKindSelecto for _, opt := range options { opt.Apply(config) } - integrator := integrator.New(aselector, eselector) + processor := processor.New(aselector, eselector) accum := sdk.NewAccumulator( - integrator, + processor, sdk.WithResource(config.Resource), ) return &Controller{ accumulator: accum, - integrator: integrator, + processor: processor, provider: registry.NewProvider(accum), period: config.CachePeriod, - checkpoint: integrator.CheckpointSet(), + checkpoint: processor.CheckpointSet(), clock: controllerTime.RealClock{}, } } // SetClock sets the clock used for caching. For testing purposes. func (c *Controller) SetClock(clock controllerTime.Clock) { - c.integrator.Lock() - defer c.integrator.Unlock() + c.processor.Lock() + defer c.processor.Unlock() c.clock = clock } @@ -84,8 +84,8 @@ func (c *Controller) Provider() metric.Provider { // Foreach gives the caller read-locked access to the current // export.CheckpointSet. func (c *Controller) ForEach(ks export.ExportKindSelector, f func(export.Record) error) error { - c.integrator.RLock() - defer c.integrator.RUnlock() + c.processor.RLock() + defer c.processor.RUnlock() return c.checkpoint.ForEach(ks, f) } @@ -93,8 +93,8 @@ func (c *Controller) ForEach(ks export.ExportKindSelector, f func(export.Record) // Collect requests a collection. The collection will be skipped if // the last collection is aged less than the CachePeriod. func (c *Controller) Collect(ctx context.Context) error { - c.integrator.Lock() - defer c.integrator.Unlock() + c.processor.Lock() + defer c.processor.Unlock() if c.period > 0 { now := c.clock.Now() @@ -106,9 +106,9 @@ func (c *Controller) Collect(ctx context.Context) error { c.lastCollect = now } - c.integrator.StartCollection() + c.processor.StartCollection() c.accumulator.Collect(ctx) - err := c.integrator.FinishCollection() - c.checkpoint = c.integrator.CheckpointSet() + err := c.processor.FinishCollection() + c.checkpoint = c.processor.CheckpointSet() return err } diff --git a/sdk/metric/controller/pull/pull_test.go b/sdk/metric/controller/pull/pull_test.go index bc264218e..c2dde5de4 100644 --- a/sdk/metric/controller/pull/pull_test.go +++ b/sdk/metric/controller/pull/pull_test.go @@ -28,7 +28,7 @@ import ( export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/metric/controller/pull" controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test" - "go.opentelemetry.io/otel/sdk/metric/integrator/test" + "go.opentelemetry.io/otel/sdk/metric/processor/test" selector "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index cf1089011..f82514ef0 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -25,7 +25,7 @@ import ( export "go.opentelemetry.io/otel/sdk/export/metric" sdk "go.opentelemetry.io/otel/sdk/metric" controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time" - "go.opentelemetry.io/otel/sdk/metric/integrator/basic" + "go.opentelemetry.io/otel/sdk/metric/processor/basic" ) // DefaultPushPeriod is the default time interval between pushes. @@ -36,7 +36,7 @@ type Controller struct { lock sync.Mutex accumulator *sdk.Accumulator provider *registry.Provider - integrator *basic.Integrator + processor *basic.Processor exporter export.Exporter wg sync.WaitGroup ch chan struct{} @@ -60,15 +60,15 @@ func New(selector export.AggregatorSelector, exporter export.Exporter, opts ...O c.Timeout = c.Period } - integrator := basic.New(selector, exporter) + processor := basic.New(selector, exporter) impl := sdk.NewAccumulator( - integrator, + processor, sdk.WithResource(c.Resource), ) return &Controller{ provider: registry.NewProvider(impl), accumulator: impl, - integrator: integrator, + processor: processor, exporter: exporter, ch: make(chan struct{}), period: c.Period, @@ -139,16 +139,16 @@ func (c *Controller) tick() { ctx, cancel := context.WithTimeout(context.Background(), c.timeout) defer cancel() - c.integrator.Lock() - defer c.integrator.Unlock() + c.processor.Lock() + defer c.processor.Unlock() - c.integrator.StartCollection() + c.processor.StartCollection() c.accumulator.Collect(ctx) - if err := c.integrator.FinishCollection(); err != nil { + if err := c.processor.FinishCollection(); err != nil { global.Handle(err) } - if err := c.exporter.Export(ctx, c.integrator.CheckpointSet()); err != nil { + if err := c.exporter.Export(ctx, c.processor.CheckpointSet()); err != nil { global.Handle(err) } } diff --git a/sdk/metric/controller/push/push_test.go b/sdk/metric/controller/push/push_test.go index 0a60f81aa..df9dbe098 100644 --- a/sdk/metric/controller/push/push_test.go +++ b/sdk/metric/controller/push/push_test.go @@ -33,8 +33,8 @@ import ( "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/controller/push" controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test" - "go.opentelemetry.io/otel/sdk/metric/integrator/test" - integratorTest "go.opentelemetry.io/otel/sdk/metric/integrator/test" + "go.opentelemetry.io/otel/sdk/metric/processor/test" + processorTest "go.opentelemetry.io/otel/sdk/metric/processor/test" "go.opentelemetry.io/otel/sdk/resource" ) @@ -125,7 +125,7 @@ func (e *testExporter) resetRecords() ([]export.Record, int) { func TestPushDoubleStop(t *testing.T) { fix := newFixture(t) - p := push.New(integratorTest.AggregatorSelector(), fix.exporter) + p := push.New(processorTest.AggregatorSelector(), fix.exporter) p.Start() p.Stop() p.Stop() diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 60933b247..9c5292498 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -30,8 +30,8 @@ import ( export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" metricsdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/integrator/test" - batchTest "go.opentelemetry.io/otel/sdk/metric/integrator/test" + "go.opentelemetry.io/otel/sdk/metric/processor/test" + batchTest "go.opentelemetry.io/otel/sdk/metric/processor/test" "go.opentelemetry.io/otel/sdk/resource" ) @@ -70,7 +70,7 @@ func init() { global.SetHandler(testHandler) } -type correctnessIntegrator struct { +type correctnessProcessor struct { t *testing.T *testSelector @@ -87,28 +87,28 @@ func (ts *testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*expor test.AggregatorSelector().AggregatorFor(desc, aggPtrs...) } -func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessIntegrator) { +func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessProcessor) { testHandler.Reset() - integrator := &correctnessIntegrator{ + processor := &correctnessProcessor{ t: t, testSelector: &testSelector{selector: test.AggregatorSelector()}, } accum := metricsdk.NewAccumulator( - integrator, + processor, metricsdk.WithResource(testResource), ) meter := metric.WrapMeterImpl(accum, "test") - return meter, accum, integrator + return meter, accum, processor } -func (ci *correctnessIntegrator) Process(accumulation export.Accumulation) error { +func (ci *correctnessProcessor) Process(accumulation export.Accumulation) error { ci.accumulations = append(ci.accumulations, accumulation) return nil } func TestInputRangeCounter(t *testing.T) { ctx := context.Background() - meter, sdk, integrator := newSDK(t) + meter, sdk, processor := newSDK(t) counter := Must(meter).NewInt64Counter("name.sum") @@ -118,10 +118,10 @@ func TestInputRangeCounter(t *testing.T) { checkpointed := sdk.Collect(ctx) require.Equal(t, 0, checkpointed) - integrator.accumulations = nil + processor.accumulations = nil counter.Add(ctx, 1) checkpointed = sdk.Collect(ctx) - sum, err := integrator.accumulations[0].Aggregator().(aggregation.Sum).Sum() + sum, err := processor.accumulations[0].Aggregator().(aggregation.Sum).Sum() require.Equal(t, int64(1), sum.AsInt64()) require.Equal(t, 1, checkpointed) require.Nil(t, err) @@ -130,7 +130,7 @@ func TestInputRangeCounter(t *testing.T) { func TestInputRangeUpDownCounter(t *testing.T) { ctx := context.Background() - meter, sdk, integrator := newSDK(t) + meter, sdk, processor := newSDK(t) counter := Must(meter).NewInt64UpDownCounter("name.sum") @@ -140,7 +140,7 @@ func TestInputRangeUpDownCounter(t *testing.T) { counter.Add(ctx, 1) checkpointed := sdk.Collect(ctx) - sum, err := integrator.accumulations[0].Aggregator().(aggregation.Sum).Sum() + sum, err := processor.accumulations[0].Aggregator().(aggregation.Sum).Sum() require.Equal(t, int64(1), sum.AsInt64()) require.Equal(t, 1, checkpointed) require.Nil(t, err) @@ -149,7 +149,7 @@ func TestInputRangeUpDownCounter(t *testing.T) { func TestInputRangeValueRecorder(t *testing.T) { ctx := context.Background() - meter, sdk, integrator := newSDK(t) + meter, sdk, processor := newSDK(t) valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact") @@ -162,10 +162,10 @@ func TestInputRangeValueRecorder(t *testing.T) { valuerecorder.Record(ctx, 1) valuerecorder.Record(ctx, 2) - integrator.accumulations = nil + processor.accumulations = nil checkpointed = sdk.Collect(ctx) - count, err := integrator.accumulations[0].Aggregator().(aggregation.Distribution).Count() + count, err := processor.accumulations[0].Aggregator().(aggregation.Distribution).Count() require.Equal(t, int64(2), count) require.Equal(t, 1, checkpointed) require.Nil(t, testHandler.Flush()) @@ -174,7 +174,7 @@ func TestInputRangeValueRecorder(t *testing.T) { func TestDisabledInstrument(t *testing.T) { ctx := context.Background() - meter, sdk, integrator := newSDK(t) + meter, sdk, processor := newSDK(t) valuerecorder := Must(meter).NewFloat64ValueRecorder("name.disabled") @@ -182,7 +182,7 @@ func TestDisabledInstrument(t *testing.T) { checkpointed := sdk.Collect(ctx) require.Equal(t, 0, checkpointed) - require.Equal(t, 0, len(integrator.accumulations)) + require.Equal(t, 0, len(processor.accumulations)) } func TestRecordNaN(t *testing.T) { @@ -198,7 +198,7 @@ func TestRecordNaN(t *testing.T) { func TestSDKLabelsDeduplication(t *testing.T) { ctx := context.Background() - meter, sdk, integrator := newSDK(t) + meter, sdk, processor := newSDK(t) counter := Must(meter).NewInt64Counter("name.sum") @@ -250,7 +250,7 @@ func TestSDKLabelsDeduplication(t *testing.T) { sdk.Collect(ctx) var actual [][]kv.KeyValue - for _, rec := range integrator.accumulations { + for _, rec := range processor.accumulations { sum, _ := rec.Aggregator().(aggregation.Sum).Sum() require.Equal(t, sum, metric.NewInt64Number(2)) @@ -297,7 +297,7 @@ func TestDefaultLabelEncoder(t *testing.T) { func TestObserverCollection(t *testing.T) { ctx := context.Background() - meter, sdk, integrator := newSDK(t) + meter, sdk, processor := newSDK(t) _ = Must(meter).NewFloat64ValueObserver("float.valueobserver.lastvalue", func(_ context.Context, result metric.Float64ObserverResult) { result.Observe(1, kv.String("A", "B")) @@ -344,10 +344,10 @@ func TestObserverCollection(t *testing.T) { collected := sdk.Collect(ctx) - require.Equal(t, collected, len(integrator.accumulations)) + require.Equal(t, collected, len(processor.accumulations)) out := batchTest.NewOutput(label.DefaultEncoder()) - for _, rec := range integrator.accumulations { + for _, rec := range processor.accumulations { require.NoError(t, out.AddAccumulation(rec)) } require.EqualValues(t, map[string]float64{ @@ -370,7 +370,7 @@ func TestObserverCollection(t *testing.T) { func TestSumObserverInputRange(t *testing.T) { ctx := context.Background() - meter, sdk, integrator := newSDK(t) + meter, sdk, processor := newSDK(t) // TODO: these tests are testing for negative values, not for _descending values_. Fix. _ = Must(meter).NewFloat64SumObserver("float.sumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) { @@ -389,7 +389,7 @@ func TestSumObserverInputRange(t *testing.T) { collected := sdk.Collect(ctx) require.Equal(t, 0, collected) - require.Equal(t, 0, len(integrator.accumulations)) + require.Equal(t, 0, len(processor.accumulations)) // check that the error condition was reset require.NoError(t, testHandler.Flush()) @@ -397,7 +397,7 @@ func TestSumObserverInputRange(t *testing.T) { func TestObserverBatch(t *testing.T) { ctx := context.Background() - meter, sdk, integrator := newSDK(t) + meter, sdk, processor := newSDK(t) var floatValueObs metric.Float64ValueObserver var intValueObs metric.Int64ValueObserver @@ -447,10 +447,10 @@ func TestObserverBatch(t *testing.T) { collected := sdk.Collect(ctx) - require.Equal(t, collected, len(integrator.accumulations)) + require.Equal(t, collected, len(processor.accumulations)) out := batchTest.NewOutput(label.DefaultEncoder()) - for _, rec := range integrator.accumulations { + for _, rec := range processor.accumulations { require.NoError(t, out.AddAccumulation(rec)) } require.EqualValues(t, map[string]float64{ @@ -473,7 +473,7 @@ func TestObserverBatch(t *testing.T) { func TestRecordBatch(t *testing.T) { ctx := context.Background() - meter, sdk, integrator := newSDK(t) + meter, sdk, processor := newSDK(t) counter1 := Must(meter).NewInt64Counter("int64.sum") counter2 := Must(meter).NewFloat64Counter("float64.sum") @@ -495,7 +495,7 @@ func TestRecordBatch(t *testing.T) { sdk.Collect(ctx) out := batchTest.NewOutput(label.DefaultEncoder()) - for _, rec := range integrator.accumulations { + for _, rec := range processor.accumulations { require.NoError(t, out.AddAccumulation(rec)) } require.EqualValues(t, map[string]float64{ @@ -511,7 +511,7 @@ func TestRecordBatch(t *testing.T) { // that its encoded labels will be cached across collection intervals. func TestRecordPersistence(t *testing.T) { ctx := context.Background() - meter, sdk, integrator := newSDK(t) + meter, sdk, processor := newSDK(t) c := Must(meter).NewFloat64Counter("name.sum") b := c.Bind(kv.String("bound", "true")) @@ -523,7 +523,7 @@ func TestRecordPersistence(t *testing.T) { sdk.Collect(ctx) } - require.Equal(t, 4, integrator.newAggCount) + require.Equal(t, 4, processor.newAggCount) } func TestIncorrectInstruments(t *testing.T) { @@ -564,7 +564,7 @@ func TestIncorrectInstruments(t *testing.T) { func TestSyncInAsync(t *testing.T) { ctx := context.Background() - meter, sdk, integrator := newSDK(t) + meter, sdk, processor := newSDK(t) counter := Must(meter).NewFloat64Counter("counter.sum") _ = Must(meter).NewInt64ValueObserver("observer.lastvalue", @@ -577,7 +577,7 @@ func TestSyncInAsync(t *testing.T) { sdk.Collect(ctx) out := batchTest.NewOutput(label.DefaultEncoder()) - for _, rec := range integrator.accumulations { + for _, rec := range processor.accumulations { require.NoError(t, out.AddAccumulation(rec)) } require.EqualValues(t, map[string]float64{ diff --git a/sdk/metric/doc.go b/sdk/metric/doc.go index e2d5e41a0..cdf4ac1a6 100644 --- a/sdk/metric/doc.go +++ b/sdk/metric/doc.go @@ -94,22 +94,22 @@ Aggregators implement a Merge method, also called in collection context, that combines state from two aggregators into one. Each SDK record has an associated aggregator. -Integrator is an interface which sits between the SDK and an exporter. -The Integrator embeds an AggregatorSelector, used by the SDK to assign -new Aggregators. The Integrator supports a Process() API for submitting -checkpointed aggregators to the integrator, and a CheckpointSet() API +Processor is an interface which sits between the SDK and an exporter. +The Processor embeds an AggregatorSelector, used by the SDK to assign +new Aggregators. The Processor supports a Process() API for submitting +checkpointed aggregators to the processor, and a CheckpointSet() API for producing a complete checkpoint for the exporter. Two default -Integrator implementations are provided, the "defaultkeys" Integrator groups +Processor implementations are provided, the "defaultkeys" Processor groups aggregate metrics by their recommended Descriptor.Keys(), the -"simple" Integrator aggregates metrics at full dimensionality. +"simple" Processor aggregates metrics at full dimensionality. LabelEncoder is an optional optimization that allows an exporter to provide the serialization logic for labels. This allows avoiding duplicate serialization of labels, once as a unique key in the SDK (or -Integrator) and once in the exporter. +Processor) and once in the exporter. -CheckpointSet is an interface between the Integrator and the Exporter. -After completing a collection pass, the Integrator.CheckpointSet() method +CheckpointSet is an interface between the Processor and the Exporter. +After completing a collection pass, the Processor.CheckpointSet() method returns a CheckpointSet, which the Exporter uses to iterate over all the updated metrics. diff --git a/sdk/metric/integrator/basic/basic.go b/sdk/metric/processor/basic/basic.go similarity index 94% rename from sdk/metric/integrator/basic/basic.go rename to sdk/metric/processor/basic/basic.go index 402d72523..1a3f611ed 100644 --- a/sdk/metric/integrator/basic/basic.go +++ b/sdk/metric/processor/basic/basic.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package basic // import "go.opentelemetry.io/otel/sdk/metric/integrator/basic" +package basic // import "go.opentelemetry.io/otel/sdk/metric/processor/basic" import ( "errors" @@ -28,7 +28,7 @@ import ( ) type ( - Integrator struct { + Processor struct { export.ExportKindSelector export.AggregatorSelector @@ -102,19 +102,19 @@ type ( } ) -var _ export.Integrator = &Integrator{} +var _ export.Processor = &Processor{} var _ export.CheckpointSet = &state{} -var ErrInconsistentState = fmt.Errorf("inconsistent integrator state") +var ErrInconsistentState = fmt.Errorf("inconsistent processor state") var ErrInvalidExporterKind = fmt.Errorf("invalid exporter kind") -// New returns a basic Integrator using the provided +// New returns a basic Processor using the provided // AggregatorSelector to select Aggregators. The ExportKindSelector // is consulted to determine the kind(s) of exporter that will consume -// data, so that this Integrator can prepare to compute Delta or +// data, so that this Processor can prepare to compute Delta or // Cumulative Aggregations as needed. -func New(aselector export.AggregatorSelector, eselector export.ExportKindSelector) *Integrator { +func New(aselector export.AggregatorSelector, eselector export.ExportKindSelector) *Processor { now := time.Now() - return &Integrator{ + return &Processor{ AggregatorSelector: aselector, ExportKindSelector: eselector, state: state{ @@ -125,8 +125,8 @@ func New(aselector export.AggregatorSelector, eselector export.ExportKindSelecto } } -// Process implements export.Integrator. -func (b *Integrator) Process(accum export.Accumulation) error { +// Process implements export.Processor. +func (b *Processor) Process(accum export.Accumulation) error { if b.startedCollection != b.finishedCollection+1 { return ErrInconsistentState } @@ -266,23 +266,23 @@ func (b *Integrator) Process(accum export.Accumulation) error { // CheckpointSet Locker interface to synchronize access to this // object. The CheckpointSet.ForEach() method cannot be called // concurrently with Process(). -func (b *Integrator) CheckpointSet() export.CheckpointSet { +func (b *Processor) CheckpointSet() export.CheckpointSet { return &b.state } -// StartCollection signals to the Integrator one or more Accumulators +// StartCollection signals to the Processor one or more Accumulators // will begin calling Process() calls during collection. -func (b *Integrator) StartCollection() { +func (b *Processor) StartCollection() { if b.startedCollection != 0 { b.intervalStart = b.intervalEnd } b.startedCollection++ } -// FinishCollection signals to the Integrator that a complete +// FinishCollection signals to the Processor that a complete // collection has finished and that ForEach will be called to access // the CheckpointSet. -func (b *Integrator) FinishCollection() error { +func (b *Processor) FinishCollection() error { b.intervalEnd = time.Now() if b.startedCollection != b.finishedCollection+1 { return ErrInconsistentState diff --git a/sdk/metric/integrator/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go similarity index 95% rename from sdk/metric/integrator/basic/basic_test.go rename to sdk/metric/processor/basic/basic_test.go index 471ef97bf..cb32dfc28 100644 --- a/sdk/metric/integrator/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -35,13 +35,13 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" - "go.opentelemetry.io/otel/sdk/metric/integrator/basic" - "go.opentelemetry.io/otel/sdk/metric/integrator/test" + "go.opentelemetry.io/otel/sdk/metric/processor/basic" + "go.opentelemetry.io/otel/sdk/metric/processor/test" "go.opentelemetry.io/otel/sdk/resource" ) -// TestIntegrator tests all the non-error paths in this package. -func TestIntegrator(t *testing.T) { +// TestProcessor tests all the non-error paths in this package. +func TestProcessor(t *testing.T) { type exportCase struct { kind export.ExportKind } @@ -164,7 +164,7 @@ func testSynchronousIntegration( for NCheckpoint := 1; NCheckpoint <= 3; NCheckpoint++ { t.Run(fmt.Sprintf("NumCkpt=%d", NCheckpoint), func(t *testing.T) { - integrator := basic.New(selector, ekind) + processor := basic.New(selector, ekind) for nc := 0; nc < NCheckpoint; nc++ { @@ -177,14 +177,14 @@ func testSynchronousIntegration( input *= cumulativeMultiplier } - integrator.StartCollection() + processor.StartCollection() for na := 0; na < NAccum; na++ { - _ = integrator.Process(updateFor(&desc1, input, labs1)) - _ = integrator.Process(updateFor(&desc2, input, labs2)) + _ = processor.Process(updateFor(&desc1, input, labs1)) + _ = processor.Process(updateFor(&desc2, input, labs2)) } - err := integrator.FinishCollection() + err := processor.FinishCollection() if err == aggregation.ErrNoSubtraction { var subr export.Aggregator selector.AggregatorFor(&desc1, &subr) @@ -201,7 +201,7 @@ func testSynchronousIntegration( continue } - checkpointSet := integrator.CheckpointSet() + checkpointSet := processor.CheckpointSet() // Test the final checkpoint state. records1 := test.NewOutput(label.DefaultEncoder()) diff --git a/sdk/metric/integrator/test/test.go b/sdk/metric/processor/test/test.go similarity index 98% rename from sdk/metric/integrator/test/test.go rename to sdk/metric/processor/test/test.go index 6af9d5c17..fe2d4eec2 100644 --- a/sdk/metric/integrator/test/test.go +++ b/sdk/metric/processor/test/test.go @@ -40,7 +40,7 @@ type ( // testAggregatorSelector returns aggregators consistent with // the test variables below, needed for testing stateful - // integrators, which clone Aggregators using AggregatorFor(desc). + // processors, which clone Aggregators using AggregatorFor(desc). testAggregatorSelector struct{} ) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index aaf7ac4b5..d00abecca 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -34,13 +34,13 @@ import ( type ( // Accumulator implements the OpenTelemetry Meter API. The - // Accumulator is bound to a single export.Integrator in + // Accumulator is bound to a single export.Processor in // `NewAccumulator()`. // // The Accumulator supports a Collect() API to gather and export // current data. Collect() should be arranged according to - // the integrator model. Push-based integrators will setup a - // timer to call Collect() periodically. Pull-based integrators + // the processor model. Push-based processors will setup a + // timer to call Collect() periodically. Pull-based processors // will call Collect() when a pull request arrives. Accumulator struct { // current maps `mapkey` to *record. @@ -55,8 +55,8 @@ type ( // incremented in `Collect()`. currentEpoch int64 - // integrator is the configured integrator+configuration. - integrator export.Integrator + // processor is the configured processor+configuration. + processor export.Processor // collectLock prevents simultaneous calls to Collect(). collectLock sync.Mutex @@ -186,7 +186,7 @@ func (a *asyncInstrument) getRecorder(labels *label.Set) export.Aggregator { if lrec.observedEpoch == a.meter.currentEpoch { // last value wins for Observers, so if we see the same labels // in the current epoch, we replace the old recorder - a.meter.integrator.AggregatorFor(&a.descriptor, &lrec.observed) + a.meter.processor.AggregatorFor(&a.descriptor, &lrec.observed) } else { lrec.observedEpoch = a.meter.currentEpoch } @@ -194,7 +194,7 @@ func (a *asyncInstrument) getRecorder(labels *label.Set) export.Aggregator { return lrec.observed } var rec export.Aggregator - a.meter.integrator.AggregatorFor(&a.descriptor, &rec) + a.meter.processor.AggregatorFor(&a.descriptor, &rec) if a.recorders == nil { a.recorders = make(map[label.Distinct]*labeledRecorder) } @@ -255,7 +255,7 @@ func (s *syncInstrument) acquireHandle(kvs []kv.KeyValue, labelPtr *label.Set) * rec.refMapped = refcountMapped{value: 2} rec.inst = s - s.meter.integrator.AggregatorFor(&s.descriptor, &rec.current, &rec.checkpoint) + s.meter.processor.AggregatorFor(&s.descriptor, &rec.current, &rec.checkpoint) for { // Load/Store: there's a memory allocation to place `mk` into @@ -298,22 +298,22 @@ func (s *syncInstrument) RecordOne(ctx context.Context, number api.Number, kvs [ } // NewAccumulator constructs a new Accumulator for the given -// integrator. This Accumulator supports only a single integrator. +// processor. This Accumulator supports only a single processor. // // The Accumulator does not start any background process to collect itself -// periodically, this responsbility lies with the integrator, typically, +// periodically, this responsbility lies with the processor, typically, // depending on the type of export. For example, a pull-based -// integrator will call Collect() when it receives a request to scrape -// current metric values. A push-based integrator should configure its +// processor will call Collect() when it receives a request to scrape +// current metric values. A push-based processor should configure its // own periodic collection. -func NewAccumulator(integrator export.Integrator, opts ...Option) *Accumulator { +func NewAccumulator(processor export.Processor, opts ...Option) *Accumulator { c := &Config{} for _, opt := range opts { opt.Apply(c) } return &Accumulator{ - integrator: integrator, + processor: processor, asyncInstruments: internal.NewAsyncInstrumentState(), resource: c.Resource, } @@ -347,7 +347,7 @@ func (m *Accumulator) NewAsyncInstrument(descriptor api.Descriptor, runner metri // exports data for each active instrument. Collect() may not be // called concurrently. // -// During the collection pass, the export.Integrator will receive +// During the collection pass, the export.Processor will receive // one Export() call per current aggregation. // // Returns the number of records that were checkpointed. @@ -445,7 +445,7 @@ func (m *Accumulator) checkpointRecord(r *record) int { } a := export.NewAccumulation(&r.inst.descriptor, r.labels, m.resource, r.checkpoint) - err = m.integrator.Process(a) + err = m.processor.Process(a) if err != nil { global.Handle(err) } @@ -463,7 +463,7 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int { if epochDiff == 0 { if lrec.observed != nil { a := export.NewAccumulation(&a.descriptor, lrec.labels, m.resource, lrec.observed) - err := m.integrator.Process(a) + err := m.processor.Process(a) if err != nil { global.Handle(err) } diff --git a/sdk/metric/stress_test.go b/sdk/metric/stress_test.go index 04715e486..e0245ae9a 100644 --- a/sdk/metric/stress_test.go +++ b/sdk/metric/stress_test.go @@ -36,7 +36,7 @@ import ( api "go.opentelemetry.io/otel/api/metric" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/integrator/test" + "go.opentelemetry.io/otel/sdk/metric/processor/test" ) const (