feat(sdk-metrics-base): shutdown and forceflush on MeterProvider (#2890)
Co-authored-by: Valentin Marchaud <contact@vmarchaud.fr>
This commit is contained in:
parent
322dabe86f
commit
7086d5aa71
|
@ -38,6 +38,7 @@ All notable changes to experimental packages in this project will be documented
|
|||
* feat(prometheus): update prometheus exporter with wip metrics sdk #2824 @legendecas
|
||||
* feat(instrumentation-xhr): add applyCustomAttributesOnSpan hook #2134 @mhennoch
|
||||
* feat(proto): add @opentelemetry/otlp-transformer package with hand-rolled transformation #2746 @dyladan
|
||||
* feat(sdk-metrics-base): shutdown and forceflush on MeterProvider #2890 @legendecas
|
||||
|
||||
### :bug: (Bug Fix)
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import { Aggregation } from './view/Aggregation';
|
|||
import { FilteringAttributesProcessor } from './view/AttributesProcessor';
|
||||
import { InstrumentType } from './InstrumentDescriptor';
|
||||
import { PatternPredicate } from './view/Predicate';
|
||||
import { ForceFlushOptions, ShutdownOptions } from './types';
|
||||
|
||||
/**
|
||||
* MeterProviderOptions provides an interface for configuring a MeterProvider.
|
||||
|
@ -163,59 +164,36 @@ export class MeterProvider implements metrics.MeterProvider {
|
|||
/**
|
||||
* Flush all buffered data and shut down the MeterProvider and all registered
|
||||
* MetricReaders.
|
||||
* Returns a promise which is resolved when all flushes are complete.
|
||||
*
|
||||
* TODO: return errors to caller somehow?
|
||||
* Returns a promise which is resolved when all flushes are complete.
|
||||
*/
|
||||
async shutdown(): Promise<void> {
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#shutdown
|
||||
|
||||
async shutdown(options?: ShutdownOptions): Promise<void> {
|
||||
if (this._shutdown) {
|
||||
api.diag.warn('shutdown may only be called once per MeterProvider');
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO add a timeout - spec leaves it up the the SDK if this is configurable
|
||||
this._shutdown = true;
|
||||
|
||||
for (const collector of this._sharedState.metricCollectors) {
|
||||
try {
|
||||
await collector.shutdown();
|
||||
} catch (e) {
|
||||
// Log all Errors.
|
||||
if (e instanceof Error) {
|
||||
api.diag.error(`Error shutting down: ${e.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
await Promise.all(this._sharedState.metricCollectors.map(collector => {
|
||||
return collector.shutdown(options);
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Notifies all registered MetricReaders to flush any buffered data.
|
||||
* Returns a promise which is resolved when all flushes are complete.
|
||||
*
|
||||
* TODO: return errors to caller somehow?
|
||||
* Returns a promise which is resolved when all flushes are complete.
|
||||
*/
|
||||
async forceFlush(): Promise<void> {
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#forceflush
|
||||
|
||||
// TODO add a timeout - spec leaves it up the the SDK if this is configurable
|
||||
|
||||
async forceFlush(options?: ForceFlushOptions): Promise<void> {
|
||||
// do not flush after shutdown
|
||||
if (this._shutdown) {
|
||||
api.diag.warn('invalid attempt to force flush after shutdown');
|
||||
api.diag.warn('invalid attempt to force flush after MeterProvider shutdown');
|
||||
return;
|
||||
}
|
||||
|
||||
for (const collector of this._sharedState.metricCollectors) {
|
||||
try {
|
||||
await collector.forceFlush();
|
||||
} catch (e) {
|
||||
// Log all Errors.
|
||||
if (e instanceof Error) {
|
||||
api.diag.error(`Error flushing: ${e.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
await Promise.all(this._sharedState.metricCollectors.map(collector => {
|
||||
return collector.forceFlush(options);
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,17 +19,7 @@ import { AggregationTemporality } from './AggregationTemporality';
|
|||
import { MetricProducer } from './MetricProducer';
|
||||
import { ResourceMetrics } from './MetricData';
|
||||
import { callWithTimeout, Maybe } from '../utils';
|
||||
|
||||
|
||||
export type ReaderOptions = {
|
||||
timeoutMillis?: number
|
||||
};
|
||||
|
||||
export type ReaderCollectionOptions = ReaderOptions;
|
||||
|
||||
export type ReaderShutdownOptions = ReaderOptions;
|
||||
|
||||
export type ReaderForceFlushOptions = ReaderOptions;
|
||||
import { CollectionOptions, ForceFlushOptions, ShutdownOptions } from '../types';
|
||||
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader
|
||||
|
||||
|
@ -53,6 +43,9 @@ export abstract class MetricReader {
|
|||
* @param metricProducer
|
||||
*/
|
||||
setMetricProducer(metricProducer: MetricProducer) {
|
||||
if (this._metricProducer) {
|
||||
throw new Error('MetricReader can not be bound to a MeterProvider again.');
|
||||
}
|
||||
this._metricProducer = metricProducer;
|
||||
this.onInitialized();
|
||||
}
|
||||
|
@ -92,7 +85,7 @@ export abstract class MetricReader {
|
|||
/**
|
||||
* Collect all metrics from the associated {@link MetricProducer}
|
||||
*/
|
||||
async collect(options?: ReaderCollectionOptions): Promise<Maybe<ResourceMetrics>> {
|
||||
async collect(options?: CollectionOptions): Promise<Maybe<ResourceMetrics>> {
|
||||
if (this._metricProducer === undefined) {
|
||||
throw new Error('MetricReader is not bound to a MetricProducer');
|
||||
}
|
||||
|
@ -117,7 +110,7 @@ export abstract class MetricReader {
|
|||
* <p> NOTE: this operation will continue even after the promise rejects due to a timeout.
|
||||
* @param options options with timeout.
|
||||
*/
|
||||
async shutdown(options?: ReaderShutdownOptions): Promise<void> {
|
||||
async shutdown(options?: ShutdownOptions): Promise<void> {
|
||||
// Do not call shutdown again if it has already been called.
|
||||
if (this._shutdown) {
|
||||
api.diag.error('Cannot call shutdown twice.');
|
||||
|
@ -140,7 +133,7 @@ export abstract class MetricReader {
|
|||
* <p> NOTE: this operation will continue even after the promise rejects due to a timeout.
|
||||
* @param options options with timeout.
|
||||
*/
|
||||
async forceFlush(options?: ReaderForceFlushOptions): Promise<void> {
|
||||
async forceFlush(options?: ForceFlushOptions): Promise<void> {
|
||||
if (this._shutdown) {
|
||||
api.diag.warn('Cannot forceFlush on already shutdown MetricReader.');
|
||||
return;
|
||||
|
|
|
@ -19,6 +19,7 @@ import { AggregationTemporality } from '../export/AggregationTemporality';
|
|||
import { ResourceMetrics } from '../export/MetricData';
|
||||
import { MetricProducer } from '../export/MetricProducer';
|
||||
import { MetricReader } from '../export/MetricReader';
|
||||
import { ForceFlushOptions, ShutdownOptions } from '../types';
|
||||
import { MeterProviderSharedState } from './MeterProviderSharedState';
|
||||
|
||||
/**
|
||||
|
@ -46,15 +47,15 @@ export class MetricCollector implements MetricProducer {
|
|||
/**
|
||||
* Delegates for MetricReader.forceFlush.
|
||||
*/
|
||||
async forceFlush(): Promise<void> {
|
||||
await this._metricReader.forceFlush();
|
||||
async forceFlush(options?: ForceFlushOptions): Promise<void> {
|
||||
await this._metricReader.forceFlush(options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delegates for MetricReader.shutdown.
|
||||
*/
|
||||
async shutdown(): Promise<void> {
|
||||
await this._metricReader.shutdown();
|
||||
async shutdown(options?: ShutdownOptions): Promise<void> {
|
||||
await this._metricReader.shutdown(options);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export type CommonReaderOptions = {
|
||||
timeoutMillis?: number
|
||||
};
|
||||
|
||||
export type CollectionOptions = CommonReaderOptions;
|
||||
|
||||
export type ShutdownOptions = CommonReaderOptions;
|
||||
|
||||
export type ForceFlushOptions = CommonReaderOptions;
|
|
@ -24,8 +24,13 @@ import {
|
|||
defaultResource
|
||||
} from './util';
|
||||
import { TestMetricReader } from './export/TestMetricReader';
|
||||
import * as sinon from 'sinon';
|
||||
|
||||
describe('MeterProvider', () => {
|
||||
afterEach(() => {
|
||||
sinon.restore();
|
||||
});
|
||||
|
||||
describe('constructor', () => {
|
||||
it('should construct without exceptions', () => {
|
||||
const meterProvider = new MeterProvider();
|
||||
|
@ -422,4 +427,53 @@ describe('MeterProvider', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('shutdown', () => {
|
||||
it('should shutdown all registered metric readers', async () => {
|
||||
const meterProvider = new MeterProvider({ resource: defaultResource });
|
||||
const reader1 = new TestMetricReader();
|
||||
const reader2 = new TestMetricReader();
|
||||
const reader1ShutdownSpy = sinon.spy(reader1, 'shutdown');
|
||||
const reader2ShutdownSpy = sinon.spy(reader2, 'shutdown');
|
||||
|
||||
meterProvider.addMetricReader(reader1);
|
||||
meterProvider.addMetricReader(reader2);
|
||||
|
||||
await meterProvider.shutdown({ timeoutMillis: 1234 });
|
||||
await meterProvider.shutdown();
|
||||
await meterProvider.shutdown();
|
||||
|
||||
assert.strictEqual(reader1ShutdownSpy.callCount, 1);
|
||||
assert.deepStrictEqual(reader1ShutdownSpy.args[0][0], { timeoutMillis: 1234 });
|
||||
assert.strictEqual(reader2ShutdownSpy.callCount, 1);
|
||||
assert.deepStrictEqual(reader2ShutdownSpy.args[0][0], { timeoutMillis: 1234 });
|
||||
});
|
||||
});
|
||||
|
||||
describe('forceFlush', () => {
|
||||
it('should forceFlush all registered metric readers', async () => {
|
||||
const meterProvider = new MeterProvider({ resource: defaultResource });
|
||||
const reader1 = new TestMetricReader();
|
||||
const reader2 = new TestMetricReader();
|
||||
const reader1ForceFlushSpy = sinon.spy(reader1, 'forceFlush');
|
||||
const reader2ForceFlushSpy = sinon.spy(reader2, 'forceFlush');
|
||||
|
||||
meterProvider.addMetricReader(reader1);
|
||||
meterProvider.addMetricReader(reader2);
|
||||
|
||||
await meterProvider.forceFlush({ timeoutMillis: 1234 });
|
||||
await meterProvider.forceFlush({ timeoutMillis: 5678 });
|
||||
assert.strictEqual(reader1ForceFlushSpy.callCount, 2);
|
||||
assert.deepStrictEqual(reader1ForceFlushSpy.args[0][0], { timeoutMillis: 1234 });
|
||||
assert.deepStrictEqual(reader1ForceFlushSpy.args[1][0], { timeoutMillis: 5678 });
|
||||
assert.strictEqual(reader2ForceFlushSpy.callCount, 2);
|
||||
assert.deepStrictEqual(reader2ForceFlushSpy.args[0][0], { timeoutMillis: 1234 });
|
||||
assert.deepStrictEqual(reader2ForceFlushSpy.args[1][0], { timeoutMillis: 5678 });
|
||||
|
||||
await meterProvider.shutdown();
|
||||
await meterProvider.forceFlush();
|
||||
assert.strictEqual(reader1ForceFlushSpy.callCount, 2);
|
||||
assert.strictEqual(reader2ForceFlushSpy.callCount, 2);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import * as assert from 'assert';
|
||||
import { MeterProvider } from '../../src/MeterProvider';
|
||||
import { TestMetricReader } from './TestMetricReader';
|
||||
|
||||
|
||||
describe('MetricReader', () => {
|
||||
describe('setMetricProducer', () => {
|
||||
it('The SDK MUST NOT allow a MetricReader instance to be registered on more than one MeterProvider instance', () => {
|
||||
const reader = new TestMetricReader();
|
||||
const meterProvider1 = new MeterProvider();
|
||||
const meterProvider2 = new MeterProvider();
|
||||
|
||||
meterProvider1.addMetricReader(reader);
|
||||
assert.throws(() => meterProvider1.addMetricReader(reader), /MetricReader can not be bound to a MeterProvider again/);
|
||||
assert.throws(() => meterProvider2.addMetricReader(reader), /MetricReader can not be bound to a MeterProvider again/);
|
||||
});
|
||||
});
|
||||
});
|
Loading…
Reference in New Issue