diff --git a/api/global/internal/meter.go b/api/global/internal/meter.go index a820a1f43..723def119 100644 --- a/api/global/internal/meter.go +++ b/api/global/internal/meter.go @@ -85,7 +85,7 @@ type asyncImpl struct { instrument - callback func(func(metric.Number, []kv.KeyValue)) + runner metric.AsyncRunner } // SyncImpler is implemented by all of the sync metric @@ -245,21 +245,21 @@ func (bound *syncHandle) Unbind() { func (m *meterImpl) NewAsyncInstrument( desc metric.Descriptor, - callback func(func(metric.Number, []kv.KeyValue)), + runner metric.AsyncRunner, ) (metric.AsyncImpl, error) { m.lock.Lock() defer m.lock.Unlock() if meterPtr := (*metric.MeterImpl)(atomic.LoadPointer(&m.delegate)); meterPtr != nil { - return (*meterPtr).NewAsyncInstrument(desc, callback) + return (*meterPtr).NewAsyncInstrument(desc, runner) } inst := &asyncImpl{ instrument: instrument{ descriptor: desc, }, - callback: callback, + runner: runner, } m.asyncInsts = append(m.asyncInsts, inst) return inst, nil @@ -276,7 +276,7 @@ func (obs *asyncImpl) setDelegate(d metric.MeterImpl) { implPtr := new(metric.AsyncImpl) var err error - *implPtr, err = d.NewAsyncInstrument(obs.descriptor, obs.callback) + *implPtr, err = d.NewAsyncInstrument(obs.descriptor, obs.runner) if err != nil { // TODO: There is no standard way to deliver this error to the user. diff --git a/api/metric/api.go b/api/metric/api.go index 636a0fa06..bb8f62532 100644 --- a/api/metric/api.go +++ b/api/metric/api.go @@ -198,9 +198,7 @@ func (m Meter) RegisterInt64Observer(name string, callback Int64ObserverCallback } return wrapInt64ObserverInstrument( m.newAsync(name, ObserverKind, Int64NumberKind, opts, - func(observe func(Number, []kv.KeyValue)) { - callback(Int64ObserverResult{observe}) - })) + newInt64AsyncRunner(callback))) } // RegisterFloat64Observer creates a new floating point Observer with @@ -213,21 +211,16 @@ func (m Meter) RegisterFloat64Observer(name string, callback Float64ObserverCall } return wrapFloat64ObserverInstrument( m.newAsync(name, ObserverKind, Float64NumberKind, opts, - func(observe func(Number, []kv.KeyValue)) { - callback(Float64ObserverResult{observe}) - })) + newFloat64AsyncRunner(callback))) } -// Observe captures a single integer value from the associated -// instrument callback, with the given labels. -func (io Int64ObserverResult) Observe(value int64, labels ...kv.KeyValue) { - io.observe(NewInt64Number(value), labels) -} - -// Observe captures a single floating point value from the associated -// instrument callback, with the given labels. -func (fo Float64ObserverResult) Observe(value float64, labels ...kv.KeyValue) { - fo.observe(NewFloat64Number(value), labels) +// NewBatchObserver creates a new BatchObserver that supports +// making batches of observations for multiple instruments. +func (m Meter) NewBatchObserver(callback BatchObserverCallback) BatchObserver { + return BatchObserver{ + meter: m, + runner: newBatchAsyncRunner(callback), + } } // WithDescription applies provided description. diff --git a/api/metric/api_test.go b/api/metric/api_test.go index 820d286e5..1395b8687 100644 --- a/api/metric/api_test.go +++ b/api/metric/api_test.go @@ -210,6 +210,51 @@ func checkBatches(t *testing.T, ctx context.Context, labels []kv.KeyValue, mock } } +func TestBatchObserver(t *testing.T) { + mockSDK, meter := mockTest.NewMeter() + + var obs1 metric.Int64Observer + var obs2 metric.Float64Observer + + labels := []kv.KeyValue{ + kv.String("A", "B"), + kv.String("C", "D"), + } + + cb := Must(meter).NewBatchObserver( + func(result metric.BatchObserverResult) { + result.Observe(labels, + obs1.Observation(42), + obs2.Observation(42.0), + ) + }, + ) + obs1 = cb.RegisterInt64Observer("test.observer.int") + obs2 = cb.RegisterFloat64Observer("test.observer.float") + + mockSDK.RunAsyncInstruments() + + require.Len(t, mockSDK.MeasurementBatches, 1) + + impl1 := obs1.AsyncImpl().Implementation().(*mockTest.Async) + impl2 := obs2.AsyncImpl().Implementation().(*mockTest.Async) + + require.NotNil(t, impl1) + require.NotNil(t, impl2) + + got := mockSDK.MeasurementBatches[0] + require.Equal(t, labels, got.Labels) + require.Len(t, got.Measurements, 2) + + m1 := got.Measurements[0] + require.Equal(t, impl1, m1.Instrument.Implementation().(*mockTest.Async)) + require.Equal(t, 0, m1.Number.CompareNumber(metric.Int64NumberKind, fortyTwo(t, metric.Int64NumberKind))) + + m2 := got.Measurements[1] + require.Equal(t, impl2, m2.Instrument.Implementation().(*mockTest.Async)) + require.Equal(t, 0, m2.Number.CompareNumber(metric.Float64NumberKind, fortyTwo(t, metric.Float64NumberKind))) +} + func checkObserverBatch(t *testing.T, labels []kv.KeyValue, mock *mockTest.MeterImpl, kind metric.NumberKind, observer metric.AsyncImpl) { t.Helper() assert.Len(t, mock.MeasurementBatches, 1) @@ -256,7 +301,7 @@ func (testWrappedMeter) NewSyncInstrument(_ metric.Descriptor) (metric.SyncImpl, return nil, nil } -func (testWrappedMeter) NewAsyncInstrument(_ metric.Descriptor, _ func(func(metric.Number, []kv.KeyValue))) (metric.AsyncImpl, error) { +func (testWrappedMeter) NewAsyncInstrument(_ metric.Descriptor, _ metric.AsyncRunner) (metric.AsyncImpl, error) { return nil, errors.New("Test wrap error") } diff --git a/api/metric/must.go b/api/metric/must.go index 69697a822..ecd47b0e0 100644 --- a/api/metric/must.go +++ b/api/metric/must.go @@ -20,6 +20,12 @@ type MeterMust struct { meter Meter } +// BatchObserverMust is a wrapper for BatchObserver that panics when +// any instrument constructor encounters an error. +type BatchObserverMust struct { + batch BatchObserver +} + // Must constructs a MeterMust implementation from a Meter, allowing // the application to panic when any instrument constructor yields an // error. @@ -86,3 +92,31 @@ func (mm MeterMust) RegisterFloat64Observer(name string, callback Float64Observe return inst } } + +// NewBatchObserver returns a wrapper around BatchObserver that panics +// when any instrument constructor returns an error. +func (mm MeterMust) NewBatchObserver(callback BatchObserverCallback) BatchObserverMust { + return BatchObserverMust{ + batch: mm.meter.NewBatchObserver(callback), + } +} + +// RegisterInt64Observer calls `BatchObserver.RegisterInt64Observer` and +// returns the instrument, panicking if it encounters an error. +func (bm BatchObserverMust) RegisterInt64Observer(name string, oos ...Option) Int64Observer { + if inst, err := bm.batch.RegisterInt64Observer(name, oos...); err != nil { + panic(err) + } else { + return inst + } +} + +// RegisterFloat64Observer calls `BatchObserver.RegisterFloat64Observer` and +// returns the instrument, panicking if it encounters an error. +func (bm BatchObserverMust) RegisterFloat64Observer(name string, oos ...Option) Float64Observer { + if inst, err := bm.batch.RegisterFloat64Observer(name, oos...); err != nil { + panic(err) + } else { + return inst + } +} diff --git a/api/metric/observer.go b/api/metric/observer.go index e4b5b47c8..e6ab3af02 100644 --- a/api/metric/observer.go +++ b/api/metric/observer.go @@ -14,13 +14,20 @@ package metric +import "go.opentelemetry.io/otel/api/kv" + // Int64ObserverCallback is a type of callback that integral // observers run. -type Int64ObserverCallback func(result Int64ObserverResult) +type Int64ObserverCallback func(Int64ObserverResult) // Float64ObserverCallback is a type of callback that floating point // observers run. -type Float64ObserverCallback func(result Float64ObserverResult) +type Float64ObserverCallback func(Float64ObserverResult) + +// BatchObserverCallback is a callback argument for use with any +// Observer instrument that will be reported as a batch of +// observations. +type BatchObserverCallback func(BatchObserverResult) // Int64Observer is a metric that captures a set of int64 values at a // point in time. @@ -33,3 +40,205 @@ type Int64Observer struct { type Float64Observer struct { asyncInstrument } + +// BatchObserver represents an Observer callback that can report +// observations for multiple instruments. +type BatchObserver struct { + meter Meter + runner AsyncBatchRunner +} + +// Int64ObserverResult is passed to an observer callback to capture +// observations for one asynchronous integer metric instrument. +type Int64ObserverResult struct { + instrument AsyncImpl + function func([]kv.KeyValue, ...Observation) +} + +// Float64ObserverResult is passed to an observer callback to capture +// observations for one asynchronous floating point metric instrument. +type Float64ObserverResult struct { + instrument AsyncImpl + function func([]kv.KeyValue, ...Observation) +} + +// BatchObserverResult is passed to a batch observer callback to +// capture observations for multiple asynchronous instruments. +type BatchObserverResult struct { + function func([]kv.KeyValue, ...Observation) +} + +// AsyncRunner is expected to convert into an AsyncSingleRunner or an +// AsyncBatchRunner. SDKs will encounter an error if the AsyncRunner +// does not satisfy one of these interfaces. +type AsyncRunner interface { + // anyRunner() is a non-exported method with no functional use + // other than to make this a non-empty interface. + anyRunner() +} + +// AsyncSingleRunner is an interface implemented by single-observer +// callbacks. +type AsyncSingleRunner interface { + // Run accepts a single instrument and function for capturing + // observations of that instrument. Each call to the function + // receives one captured observation. (The function accepts + // multiple observations so the same implementation can be + // used for batch runners.) + Run(single AsyncImpl, capture func([]kv.KeyValue, ...Observation)) + + AsyncRunner +} + +// AsyncBatchRunner is an interface implemented by batch-observer +// callbacks. +type AsyncBatchRunner interface { + // Run accepts a function for capturing observations of + // multiple instruments. + Run(capture func([]kv.KeyValue, ...Observation)) + + AsyncRunner +} + +// Observe captures a single integer value from the associated +// instrument callback, with the given labels. +func (ir Int64ObserverResult) Observe(value int64, labels ...kv.KeyValue) { + ir.function(labels, Observation{ + instrument: ir.instrument, + number: NewInt64Number(value), + }) +} + +// Observe captures a single floating point value from the associated +// instrument callback, with the given labels. +func (fr Float64ObserverResult) Observe(value float64, labels ...kv.KeyValue) { + fr.function(labels, Observation{ + instrument: fr.instrument, + number: NewFloat64Number(value), + }) +} + +// Observe captures a multiple observations from the associated batch +// instrument callback, with the given labels. +func (br BatchObserverResult) Observe(labels []kv.KeyValue, obs ...Observation) { + br.function(labels, obs...) +} + +// Observation is used for reporting a batch of metric +// values. Instances of this type should be created by Observer +// instruments (e.g., Int64Observer.Observation()). +type Observation struct { + // number needs to be aligned for 64-bit atomic operations. + number Number + instrument AsyncImpl +} + +// AsyncImpl returns the instrument that created this observation. +// This returns an implementation-level object for use by the SDK, +// users should not refer to this. +func (m Observation) AsyncImpl() AsyncImpl { + return m.instrument +} + +// Number returns a number recorded in this observation. +func (m Observation) Number() Number { + return m.number +} + +// RegisterInt64Observer creates a new integer Observer instrument +// with the given name, running in a batch callback, and customized with +// options. May return an error if the name is invalid (e.g., empty) +// or improperly registered (e.g., duplicate registration). +func (b BatchObserver) RegisterInt64Observer(name string, opts ...Option) (Int64Observer, error) { + if b.runner == nil { + return wrapInt64ObserverInstrument(NoopAsync{}, nil) + } + return wrapInt64ObserverInstrument( + b.meter.newAsync(name, ObserverKind, Int64NumberKind, opts, b.runner)) +} + +// RegisterFloat64Observer creates a new floating point Observer with +// the given name, running in a batch callback, and customized with +// options. May return an error if the name is invalid (e.g., empty) +// or improperly registered (e.g., duplicate registration). +func (b BatchObserver) RegisterFloat64Observer(name string, opts ...Option) (Float64Observer, error) { + if b.runner == nil { + return wrapFloat64ObserverInstrument(NoopAsync{}, nil) + } + return wrapFloat64ObserverInstrument( + b.meter.newAsync(name, ObserverKind, Float64NumberKind, opts, + b.runner)) +} + +// Observation returns an Observation, a BatchObserverCallback +// argument, for an asynchronous integer instrument. +// This returns an implementation-level object for use by the SDK, +// users should not refer to this. +func (i Int64Observer) Observation(v int64) Observation { + return Observation{ + number: NewInt64Number(v), + instrument: i.instrument, + } +} + +// Observation returns an Observation, a BatchObserverCallback +// argument, for an asynchronous integer instrument. +// This returns an implementation-level object for use by the SDK, +// users should not refer to this. +func (f Float64Observer) Observation(v float64) Observation { + return Observation{ + number: NewFloat64Number(v), + instrument: f.instrument, + } +} + +var _ AsyncSingleRunner = (*Int64ObserverCallback)(nil) +var _ AsyncSingleRunner = (*Float64ObserverCallback)(nil) +var _ AsyncBatchRunner = (*BatchObserverCallback)(nil) + +// newInt64AsyncRunner returns a single-observer callback for integer Observer instruments. +func newInt64AsyncRunner(c Int64ObserverCallback) AsyncSingleRunner { + return &c +} + +// newFloat64AsyncRunner returns a single-observer callback for floating point Observer instruments. +func newFloat64AsyncRunner(c Float64ObserverCallback) AsyncSingleRunner { + return &c +} + +// newBatchAsyncRunner returns a batch-observer callback use with multiple Observer instruments. +func newBatchAsyncRunner(c BatchObserverCallback) AsyncBatchRunner { + return &c +} + +// anyRunner implements AsyncRunner. +func (*Int64ObserverCallback) anyRunner() {} + +// anyRunner implements AsyncRunner. +func (*Float64ObserverCallback) anyRunner() {} + +// anyRunner implements AsyncRunner. +func (*BatchObserverCallback) anyRunner() {} + +// Run implements AsyncSingleRunner. +func (i *Int64ObserverCallback) Run(impl AsyncImpl, function func([]kv.KeyValue, ...Observation)) { + (*i)(Int64ObserverResult{ + instrument: impl, + function: function, + }) +} + +// Run implements AsyncSingleRunner. +func (f *Float64ObserverCallback) Run(impl AsyncImpl, function func([]kv.KeyValue, ...Observation)) { + (*f)(Float64ObserverResult{ + instrument: impl, + function: function, + }) +} + +// Run implements AsyncBatchRunner. +func (b *BatchObserverCallback) Run(function func([]kv.KeyValue, ...Observation)) { + (*b)(BatchObserverResult{ + function: function, + }) +} diff --git a/api/metric/registry/registry.go b/api/metric/registry/registry.go index 110ebe173..3a66b6903 100644 --- a/api/metric/registry/registry.go +++ b/api/metric/registry/registry.go @@ -32,6 +32,8 @@ type uniqueInstrumentMeterImpl struct { state map[key]metric.InstrumentImpl } +var _ metric.MeterImpl = (*uniqueInstrumentMeterImpl)(nil) + type key struct { name string libraryName string @@ -42,8 +44,6 @@ type key struct { var ErrMetricKindMismatch = fmt.Errorf( "A metric was already registered by this name with another kind or number type") -var _ metric.MeterImpl = (*uniqueInstrumentMeterImpl)(nil) - // NewUniqueInstrumentMeterImpl returns a wrapped metric.MeterImpl with // the addition of uniqueness checking. func NewUniqueInstrumentMeterImpl(impl metric.MeterImpl) metric.MeterImpl { @@ -125,7 +125,7 @@ func (u *uniqueInstrumentMeterImpl) NewSyncInstrument(descriptor metric.Descript // NewAsyncInstrument implements metric.MeterImpl. func (u *uniqueInstrumentMeterImpl) NewAsyncInstrument( descriptor metric.Descriptor, - callback func(func(metric.Number, []kv.KeyValue)), + runner metric.AsyncRunner, ) (metric.AsyncImpl, error) { u.lock.Lock() defer u.lock.Unlock() @@ -138,7 +138,7 @@ func (u *uniqueInstrumentMeterImpl) NewAsyncInstrument( return impl.(metric.AsyncImpl), nil } - asyncInst, err := u.impl.NewAsyncInstrument(descriptor, callback) + asyncInst, err := u.impl.NewAsyncInstrument(descriptor, runner) if err != nil { return nil, err } diff --git a/api/metric/sdkhelpers.go b/api/metric/sdkhelpers.go index 2024c2ccb..9694537db 100644 --- a/api/metric/sdkhelpers.go +++ b/api/metric/sdkhelpers.go @@ -36,7 +36,7 @@ type MeterImpl interface { // one occur. NewAsyncInstrument( descriptor Descriptor, - callback func(func(Number, []kv.KeyValue)), + runner AsyncRunner, ) (AsyncImpl, error) } @@ -83,18 +83,6 @@ type AsyncImpl interface { InstrumentImpl } -// Int64ObserverResult is passed to an observer callback to capture -// observations for one asynchronous integer metric instrument. -type Int64ObserverResult struct { - observe func(Number, []kv.KeyValue) -} - -// Float64ObserverResult is passed to an observer callback to capture -// observations for one asynchronous floating point metric instrument. -type Float64ObserverResult struct { - observe func(Number, []kv.KeyValue) -} - // Configure is a helper that applies all the options to a Config. func Configure(opts []Option) Config { var config Config @@ -165,13 +153,13 @@ func wrapFloat64MeasureInstrument(syncInst SyncImpl, err error) (Float64Measure, } // newAsync constructs one new asynchronous instrument. -func (m Meter) newAsync(name string, mkind Kind, nkind NumberKind, opts []Option, callback func(func(Number, []kv.KeyValue))) (AsyncImpl, error) { +func (m Meter) newAsync(name string, mkind Kind, nkind NumberKind, opts []Option, runner AsyncRunner) (AsyncImpl, error) { if m.impl == nil { return NoopAsync{}, nil } desc := NewDescriptor(name, mkind, nkind, opts...) desc.config.LibraryName = m.libraryName - return m.impl.NewAsyncInstrument(desc, callback) + return m.impl.NewAsyncInstrument(desc, runner) } // wrapInt64ObserverInstrument returns an `Int64Observer` from a diff --git a/internal/metric/async.go b/internal/metric/async.go new file mode 100644 index 000000000..07b7e01df --- /dev/null +++ b/internal/metric/async.go @@ -0,0 +1,160 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "errors" + "fmt" + "os" + "sync" + + "go.opentelemetry.io/otel/api/kv" + "go.opentelemetry.io/otel/api/metric" +) + +var ErrInvalidAsyncRunner = errors.New("unknown async runner type") + +// AsyncCollector is an interface used between the MeterImpl and the +// AsyncInstrumentState helper below. This interface is implemented by +// the SDK to provide support for running observer callbacks. +type AsyncCollector interface { + // CollectAsync passes a batch of observations to the MeterImpl. + CollectAsync([]kv.KeyValue, ...metric.Observation) +} + +// AsyncInstrumentState manages an ordered set of asynchronous +// instruments and the distinct runners, taking into account batch +// observer callbacks. +type AsyncInstrumentState struct { + lock sync.Mutex + + // errorHandler will be called in case of an invalid + // metric.AsyncRunner, i.e., one that does not implement + // either the single-or batch-runner interfaces. + errorHandler func(error) + errorOnce sync.Once + + // runnerMap keeps the set of runners that will run each + // collection interval. Singletons are entered with a real + // instrument each, batch observers are entered with a nil + // instrument, ensuring that when a singleton callback is used + // repeatedly, it is excuted repeatedly in the interval, while + // when a batch callback is used repeatedly, it only executes + // once per interval. + runnerMap map[asyncRunnerPair]struct{} + + // runners maintains the set of runners in the order they were + // registered. + runners []asyncRunnerPair + + // instruments maintains the set of instruments in the order + // they were registered. + instruments []metric.AsyncImpl +} + +// asyncRunnerPair is a map entry for Observer callback runners. +type asyncRunnerPair struct { + // runner is used as a map key here. The API ensures + // that all callbacks are pointers for this reason. + runner metric.AsyncRunner + + // inst refers to a non-nil instrument when `runner` is a + // AsyncSingleRunner. + inst metric.AsyncImpl +} + +// NewAsyncInstrumentState returns a new *AsyncInstrumentState, for +// use by MeterImpl to manage running the set of observer callbacks in +// the correct order. +// +// errorHandler is used to print an error condition. If errorHandler +// nil, the default error handler will be used that prints to +// os.Stderr. Only the first error is passed to the handler, after +// which errors are skipped. +func NewAsyncInstrumentState(errorHandler func(error)) *AsyncInstrumentState { + if errorHandler == nil { + errorHandler = func(err error) { + fmt.Fprintln(os.Stderr, "Metrics Async state error:", err) + } + } + return &AsyncInstrumentState{ + errorHandler: errorHandler, + runnerMap: map[asyncRunnerPair]struct{}{}, + } +} + +// Instruments returns the asynchronous instruments managed by this +// object, the set that should be checkpointed after observers are +// run. +func (a *AsyncInstrumentState) Instruments() []metric.AsyncImpl { + a.lock.Lock() + defer a.lock.Unlock() + return a.instruments +} + +// Register adds a new asynchronous instrument to by managed by this +// object. This should be called during NewAsyncInstrument() and +// assumes that errors (e.g., duplicate registration) have already +// been checked. +func (a *AsyncInstrumentState) Register(inst metric.AsyncImpl, runner metric.AsyncRunner) { + a.lock.Lock() + defer a.lock.Unlock() + + a.instruments = append(a.instruments, inst) + + // asyncRunnerPair reflects this callback in the asyncRunners + // list. If this is a batch runner, the instrument is nil. + // If this is a single-Observer runner, the instrument is + // included. This ensures that batch callbacks are called + // once and single callbacks are called once per instrument. + rp := asyncRunnerPair{ + runner: runner, + } + if _, ok := runner.(metric.AsyncSingleRunner); ok { + rp.inst = inst + } + + if _, ok := a.runnerMap[rp]; !ok { + a.runnerMap[rp] = struct{}{} + a.runners = append(a.runners, rp) + } +} + +// Run executes the complete set of observer callbacks. +func (a *AsyncInstrumentState) Run(collector AsyncCollector) { + a.lock.Lock() + runners := a.runners + a.lock.Unlock() + + for _, rp := range runners { + // The runner must be a single or batch runner, no + // other implementations are possible because the + // interface has un-exported methods. + + if singleRunner, ok := rp.runner.(metric.AsyncSingleRunner); ok { + singleRunner.Run(rp.inst, collector.CollectAsync) + continue + } + + if multiRunner, ok := rp.runner.(metric.AsyncBatchRunner); ok { + multiRunner.Run(collector.CollectAsync) + continue + } + + a.errorOnce.Do(func() { + a.errorHandler(fmt.Errorf("%w: type %T (reported once)", ErrInvalidAsyncRunner, rp)) + }) + } +} diff --git a/internal/metric/mock.go b/internal/metric/mock.go index 9a50410af..4ec328585 100644 --- a/internal/metric/mock.go +++ b/internal/metric/mock.go @@ -46,8 +46,11 @@ type ( } MeterImpl struct { + lock sync.Mutex + MeasurementBatches []Batch - AsyncInstruments []*Async + + asyncInstruments *AsyncInstrumentState } Measurement struct { @@ -64,7 +67,7 @@ type ( Async struct { Instrument - callback func(func(apimetric.Number, []kv.KeyValue)) + runner apimetric.AsyncRunner } Sync struct { @@ -110,14 +113,16 @@ func (h *Handle) Unbind() { } func (m *MeterImpl) doRecordSingle(ctx context.Context, labels []kv.KeyValue, instrument apimetric.InstrumentImpl, number apimetric.Number) { - m.recordMockBatch(ctx, labels, Measurement{ + m.collect(ctx, labels, []Measurement{{ Instrument: instrument, Number: number, - }) + }}) } func NewProvider() (*MeterImpl, apimetric.Provider) { - impl := &MeterImpl{} + impl := &MeterImpl{ + asyncInstruments: NewAsyncInstrumentState(nil), + } p := &MeterProvider{ impl: impl, unique: registry.NewUniqueInstrumentMeterImpl(impl), @@ -144,6 +149,9 @@ func NewMeter() (*MeterImpl, apimetric.Meter) { } func (m *MeterImpl) NewSyncInstrument(descriptor metric.Descriptor) (apimetric.SyncImpl, error) { + m.lock.Lock() + defer m.lock.Unlock() + return &Sync{ Instrument{ descriptor: descriptor, @@ -152,15 +160,18 @@ func (m *MeterImpl) NewSyncInstrument(descriptor metric.Descriptor) (apimetric.S }, nil } -func (m *MeterImpl) NewAsyncInstrument(descriptor metric.Descriptor, callback func(func(apimetric.Number, []kv.KeyValue))) (apimetric.AsyncImpl, error) { +func (m *MeterImpl) NewAsyncInstrument(descriptor metric.Descriptor, runner metric.AsyncRunner) (apimetric.AsyncImpl, error) { + m.lock.Lock() + defer m.lock.Unlock() + a := &Async{ Instrument: Instrument{ descriptor: descriptor, meter: m, }, - callback: callback, + runner: runner, } - m.AsyncInstruments = append(m.AsyncInstruments, a) + m.asyncInstruments.Register(a, runner) return a, nil } @@ -173,10 +184,25 @@ func (m *MeterImpl) RecordBatch(ctx context.Context, labels []kv.KeyValue, measu Number: m.Number(), } } - m.recordMockBatch(ctx, labels, mm...) + m.collect(ctx, labels, mm) } -func (m *MeterImpl) recordMockBatch(ctx context.Context, labels []kv.KeyValue, measurements ...Measurement) { +func (m *MeterImpl) CollectAsync(labels []kv.KeyValue, obs ...metric.Observation) { + mm := make([]Measurement, len(obs)) + for i := 0; i < len(obs); i++ { + o := obs[i] + mm[i] = Measurement{ + Instrument: o.AsyncImpl(), + Number: o.Number(), + } + } + m.collect(context.Background(), labels, mm) +} + +func (m *MeterImpl) collect(ctx context.Context, labels []kv.KeyValue, measurements []Measurement) { + m.lock.Lock() + defer m.lock.Unlock() + m.MeasurementBatches = append(m.MeasurementBatches, Batch{ Ctx: ctx, Labels: labels, @@ -185,9 +211,5 @@ func (m *MeterImpl) recordMockBatch(ctx context.Context, labels []kv.KeyValue, m } func (m *MeterImpl) RunAsyncInstruments() { - for _, observer := range m.AsyncInstruments { - observer.callback(func(n apimetric.Number, labels []kv.KeyValue) { - m.doRecordSingle(context.Background(), labels, observer, n) - }) - } + m.asyncInstruments.Run(m) } diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 971c58158..02174b877 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -324,6 +324,60 @@ func TestObserverCollection(t *testing.T) { }, out.Map) } +func TestObserverBatch(t *testing.T) { + ctx := context.Background() + integrator := &correctnessIntegrator{ + t: t, + } + + sdk := metricsdk.NewAccumulator(integrator) + meter := metric.WrapMeterImpl(sdk, "test") + + var floatObs metric.Float64Observer + var intObs metric.Int64Observer + var batch = Must(meter).NewBatchObserver( + func(result metric.BatchObserverResult) { + result.Observe( + []kv.KeyValue{ + kv.String("A", "B"), + }, + floatObs.Observation(1), + floatObs.Observation(-1), + intObs.Observation(-1), + intObs.Observation(1), + ) + result.Observe( + []kv.KeyValue{ + kv.String("C", "D"), + }, + floatObs.Observation(-1), + ) + result.Observe( + nil, + intObs.Observation(1), + intObs.Observation(1), + ) + }) + floatObs = batch.RegisterFloat64Observer("float.observer") + intObs = batch.RegisterInt64Observer("int.observer") + + collected := sdk.Collect(ctx) + + require.Equal(t, 4, collected) + require.Equal(t, 4, len(integrator.records)) + + out := batchTest.NewOutput(label.DefaultEncoder()) + for _, rec := range integrator.records { + _ = out.AddTo(rec) + } + require.EqualValues(t, map[string]float64{ + "float.observer/A=B": -1, + "float.observer/C=D": -1, + "int.observer/": 1, + "int.observer/A=B": 1, + }, out.Map) +} + func TestRecordBatch(t *testing.T) { ctx := context.Background() integrator := &correctnessIntegrator{ diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 48734599b..9be8b17ca 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/metric" api "go.opentelemetry.io/otel/api/metric" + internal "go.opentelemetry.io/otel/internal/metric" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" ) @@ -46,7 +47,9 @@ type ( // asyncInstruments is a set of // `*asyncInstrument` instances - asyncInstruments sync.Map + asyncLock sync.Mutex + asyncInstruments *internal.AsyncInstrumentState + asyncContext context.Context // currentEpoch is the current epoch number. It is // incremented in `Collect()`. @@ -129,8 +132,6 @@ type ( // recorders maps ordered labels to the pair of // labelset and recorder recorders map[label.Distinct]*labeledRecorder - - callback func(func(api.Number, []kv.KeyValue)) } labeledRecorder struct { @@ -161,7 +162,7 @@ func (s *syncInstrument) Implementation() interface{} { return s } -func (a *asyncInstrument) observe(number api.Number, labels []kv.KeyValue) { +func (a *asyncInstrument) observe(number api.Number, labels *label.Set) { if err := aggregator.RangeTest(number, &a.descriptor); err != nil { a.meter.errorHandler(err) return @@ -178,12 +179,7 @@ func (a *asyncInstrument) observe(number api.Number, labels []kv.KeyValue) { } } -func (a *asyncInstrument) getRecorder(kvs []kv.KeyValue) export.Aggregator { - // We are in a single-threaded context. Note: this assumption - // could be violated if the user added concurrency within - // their callback. - labels := label.NewSetWithSortable(kvs, &a.meter.asyncSortSlice) - +func (a *asyncInstrument) getRecorder(labels *label.Set) export.Aggregator { lrec, ok := a.recorders[labels.Equivalent()] if ok { if lrec.observedEpoch == a.meter.currentEpoch { @@ -205,7 +201,7 @@ func (a *asyncInstrument) getRecorder(kvs []kv.KeyValue) export.Aggregator { // but will be revisited later. a.recorders[labels.Equivalent()] = &labeledRecorder{ recorder: rec, - labels: &labels, + labels: labels, observedEpoch: a.meter.currentEpoch, } return rec @@ -318,15 +314,19 @@ func NewAccumulator(integrator export.Integrator, opts ...Option) *Accumulator { } return &Accumulator{ - integrator: integrator, - errorHandler: c.ErrorHandler, + integrator: integrator, + errorHandler: c.ErrorHandler, + asyncInstruments: internal.NewAsyncInstrumentState(c.ErrorHandler), } } +// DefaultErrorHandler is used when the user does not configure an +// error handler. Prints messages to os.Stderr. func DefaultErrorHandler(err error) { fmt.Fprintln(os.Stderr, "Metrics Accumulator error:", err) } +// NewSyncInstrument implements api.MetricImpl. func (m *Accumulator) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) { return &syncInstrument{ instrument: instrument{ @@ -336,15 +336,17 @@ func (m *Accumulator) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl }, nil } -func (m *Accumulator) NewAsyncInstrument(descriptor api.Descriptor, callback func(func(api.Number, []kv.KeyValue))) (api.AsyncImpl, error) { +// NewAsyncInstrument implements api.MetricImpl. +func (m *Accumulator) NewAsyncInstrument(descriptor api.Descriptor, runner metric.AsyncRunner) (api.AsyncImpl, error) { a := &asyncInstrument{ instrument: instrument{ descriptor: descriptor, meter: m, }, - callback: callback, } - m.asyncInstruments.Store(a, nil) + m.asyncLock.Lock() + defer m.asyncLock.Unlock() + m.asyncInstruments.Register(a, runner) return a, nil } @@ -360,13 +362,13 @@ func (m *Accumulator) Collect(ctx context.Context) int { m.collectLock.Lock() defer m.collectLock.Unlock() - checkpointed := m.collectRecords(ctx) - checkpointed += m.collectAsync(ctx) + checkpointed := m.collectSyncInstruments(ctx) + checkpointed += m.observeAsyncInstruments(ctx) m.currentEpoch++ return checkpointed } -func (m *Accumulator) collectRecords(ctx context.Context) int { +func (m *Accumulator) collectSyncInstruments(ctx context.Context) int { checkpointed := 0 m.current.Range(func(key interface{}, value interface{}) bool { @@ -409,24 +411,39 @@ func (m *Accumulator) collectRecords(ctx context.Context) int { return checkpointed } -func (m *Accumulator) collectAsync(ctx context.Context) int { - checkpointed := 0 +// CollectAsync implements internal.AsyncCollector. +func (m *Accumulator) CollectAsync(kv []kv.KeyValue, obs ...metric.Observation) { + labels := label.NewSetWithSortable(kv, &m.asyncSortSlice) - m.asyncInstruments.Range(func(key, value interface{}) bool { - a := key.(*asyncInstrument) - a.callback(a.observe) - checkpointed += m.checkpointAsync(ctx, a) - return true - }) + for _, ob := range obs { + a := ob.AsyncImpl().Implementation().(*asyncInstrument) + a.observe(ob.Number(), &labels) + } +} - return checkpointed +func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int { + m.asyncLock.Lock() + defer m.asyncLock.Unlock() + + asyncCollected := 0 + m.asyncContext = ctx + + m.asyncInstruments.Run(m) + m.asyncContext = nil + + for _, inst := range m.asyncInstruments.Instruments() { + a := inst.Implementation().(*asyncInstrument) + asyncCollected += m.checkpointAsync(a) + } + + return asyncCollected } func (m *Accumulator) checkpointRecord(ctx context.Context, r *record) int { return m.checkpoint(ctx, &r.inst.descriptor, r.recorder, r.labels) } -func (m *Accumulator) checkpointAsync(ctx context.Context, a *asyncInstrument) int { +func (m *Accumulator) checkpointAsync(a *asyncInstrument) int { if len(a.recorders) == 0 { return 0 } @@ -435,7 +452,7 @@ func (m *Accumulator) checkpointAsync(ctx context.Context, a *asyncInstrument) i lrec := lrec epochDiff := m.currentEpoch - lrec.observedEpoch if epochDiff == 0 { - checkpointed += m.checkpoint(ctx, &a.descriptor, lrec.recorder, lrec.labels) + checkpointed += m.checkpoint(m.asyncContext, &a.descriptor, lrec.recorder, lrec.labels) } else if epochDiff > 1 { // This is second collection cycle with no // observations for this labelset. Remove the @@ -485,6 +502,7 @@ func (m *Accumulator) RecordBatch(ctx context.Context, kvs []kv.KeyValue, measur } } +// RecordOne implements api.SyncImpl. func (r *record) RecordOne(ctx context.Context, number api.Number) { if r.recorder == nil { // The instrument is disabled according to the AggregationSelector. @@ -503,6 +521,7 @@ func (r *record) RecordOne(ctx context.Context, number api.Number) { atomic.AddInt64(&r.updateCount, 1) } +// Unbind implements api.SyncImpl. func (r *record) Unbind() { r.refMapped.unref() }