Metrics updates (#1700)

* feat: renaming batcher to processor, fixing aggregators, adding missing metrics in api

* chore: fixing metrics in exporter collector, updated tests, fixing observer result

* chore: refactoring tests for rest of collectors

* chore: fixing monotonic sum observer, updated test to cover that scenario correctly
This commit is contained in:
Bartlomiej Obecny 2020-12-03 00:40:42 +01:00 committed by GitHub
parent b22ca638ff
commit 781b30f5bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 702 additions and 445 deletions

View File

@ -1,6 +1,6 @@
# Batcher API Guide
# Processor API Guide
[The batcher](https://github.com/open-telemetry/opentelemetry-js/blob/master/packages/opentelemetry-metrics/src/export/Batcher.ts?rgh-link-date=2020-05-25T18%3A43%3A57Z) has two responsibilities: choosing which aggregator to choose for a metric instrument and store the last record for each metric ready to be exported.
[The processor](https://github.com/open-telemetry/opentelemetry-js/blob/master/packages/opentelemetry-metrics/src/export/Processor.ts?rgh-link-date=2020-05-25T18%3A43%3A57Z) has two responsibilities: choosing which aggregator to choose for a metric instrument and store the last record for each metric ready to be exported.
## Selecting a specific aggregator for metrics
@ -41,25 +41,25 @@ export class AverageAggregator implements Aggregator {
}
```
Now we will need to implement our own batcher to configure the sdk to use our new aggregator. To simplify even more, we will just extend the `UngroupedBatcher` (which is the default) to avoid re-implementing the whole `Aggregator` interface.
Now we will need to implement our own processor to configure the sdk to use our new aggregator. To simplify even more, we will just extend the `UngroupedProcessor` (which is the default) to avoid re-implementing the whole `Aggregator` interface.
Here the result:
```ts
import {
UngroupedBatcher,
UngroupedProcessor,
MetricDescriptor,
CounterSumAggregator,
ObserverAggregator,
MeasureExactAggregator,
} from '@opentelemetry/metrics';
export class CustomBatcher extends UngroupedBatcher {
export class CustomProcessor extends UngroupedProcessor {
aggregatorFor (metricDescriptor: MetricDescriptor) {
if (metricDescriptor.name === 'requests') {
return new AverageAggregator(10);
}
// this is exactly what the "UngroupedBatcher" does, we will re-use it
// this is exactly what the "UngroupedProcessor" does, we will re-use it
// to fallback on the default behavior
switch (metricDescriptor.metricKind) {
case MetricKind.COUNTER:
@ -73,11 +73,11 @@ export class CustomBatcher extends UngroupedBatcher {
}
```
Finally, we need to specify to the `MeterProvider` to use our `CustomBatcher` when creating new meter:
Finally, we need to specify to the `MeterProvider` to use our `CustomProcessor` when creating new meter:
```ts
import {
UngroupedBatcher,
UngroupedProcessor,
MetricDescriptor,
CounterSumAggregator,
ObserverAggregator,
@ -115,12 +115,12 @@ export class AverageAggregator implements Aggregator {
}
}
export class CustomBatcher extends UngroupedBatcher {
export class CustomProcessor extends UngroupedProcessor {
aggregatorFor (metricDescriptor: MetricDescriptor) {
if (metricDescriptor.name === 'requests') {
return new AverageAggregator(10);
}
// this is exactly what the "UngroupedBatcher" does, we will re-use it
// this is exactly what the "UngroupedProcessor" does, we will re-use it
// to fallback on the default behavior
switch (metricDescriptor.metricKind) {
case MetricKind.COUNTER:
@ -134,9 +134,9 @@ export class CustomBatcher extends UngroupedBatcher {
}
const meter = new MeterProvider({
batcher: new CustomBatcher(),
processor: new CustomProcessor(),
interval: 1000,
}).getMeter('example-custom-batcher');
}).getMeter('example-custom-processor');
const requestsLatency = meter.createValueRecorder('requests', {
monotonic: true,

View File

@ -23,6 +23,8 @@ import {
BatchObserver,
BatchMetricOptions,
UpDownCounter,
SumObserver,
UpDownSumObserver,
} from './Metric';
import { ObserverResult } from './ObserverResult';
@ -81,6 +83,30 @@ export interface Meter {
callback?: (observerResult: ObserverResult) => void
): ValueObserver;
/**
* Creates a new `SumObserver` metric.
* @param name the name of the metric.
* @param [options] the metric options.
* @param [callback] the observer callback
*/
createSumObserver(
name: string,
options?: MetricOptions,
callback?: (observerResult: ObserverResult) => void
): SumObserver;
/**
* Creates a new `UpDownSumObserver` metric.
* @param name the name of the metric.
* @param [options] the metric options.
* @param [callback] the observer callback
*/
createUpDownSumObserver(
name: string,
options?: MetricOptions,
callback?: (observerResult: ObserverResult) => void
): UpDownSumObserver;
/**
* Creates a new `BatchObserver` metric, can be used to update many metrics
* at the same time and when operations needs to be async

View File

@ -26,6 +26,7 @@ import {
BatchObserver,
UpDownCounter,
BaseObserver,
UpDownSumObserver,
} from './Metric';
import {
BoundValueRecorder,
@ -84,6 +85,34 @@ export class NoopMeter implements Meter {
return NOOP_VALUE_OBSERVER_METRIC;
}
/**
* Returns constant noop sum observer.
* @param name the name of the metric.
* @param [options] the metric options.
* @param [callback] the sum observer callback
*/
createSumObserver(
_name: string,
_options?: MetricOptions,
_callback?: (observerResult: ObserverResult) => void
): ValueObserver {
return NOOP_SUM_OBSERVER_METRIC;
}
/**
* Returns constant noop up down sum observer.
* @param name the name of the metric.
* @param [options] the metric options.
* @param [callback] the up down sum observer callback
*/
createUpDownSumObserver(
_name: string,
_options?: MetricOptions,
_callback?: (observerResult: ObserverResult) => void
): UpDownSumObserver {
return NOOP_UP_DOWN_SUM_OBSERVER_METRIC;
}
/**
* Returns constant noop batch observer.
* @param name the name of the metric.

View File

@ -22,7 +22,6 @@ import * as fs from 'fs';
import * as assert from 'assert';
import * as sinon from 'sinon';
import { collectorTypes } from '@opentelemetry/exporter-collector';
import { MetricRecord } from '@opentelemetry/metrics';
import { CollectorMetricExporter } from '../src';
import {
mockCounter,
@ -35,6 +34,8 @@ import {
mockValueRecorder,
} from './helper';
import { ConsoleLogger, LogLevel } from '@opentelemetry/core';
import * as api from '@opentelemetry/api';
import * as metrics from '@opentelemetry/metrics';
const metricsServiceProtoPath =
'opentelemetry/proto/collector/metrics/v1/metrics_service.proto';
@ -59,7 +60,7 @@ const testCollectorMetricExporter = (params: TestParams) =>
let exportedData:
| collectorTypes.opentelemetryProto.metrics.v1.ResourceMetrics[]
| undefined;
let metrics: MetricRecord[];
let metrics: metrics.MetricRecord[];
let reqMetadata: grpc.Metadata | undefined;
before(done => {
@ -134,17 +135,23 @@ const testCollectorMetricExporter = (params: TestParams) =>
value: 1592602232694000000,
});
metrics = [];
metrics.push(await mockCounter());
metrics.push(await mockObserver());
metrics.push(await mockValueRecorder());
const counter: metrics.Metric<metrics.BoundCounter> &
api.Counter = mockCounter();
const observer: metrics.Metric<metrics.BoundObserver> &
api.ValueObserver = mockObserver(observerResult => {
observerResult.observe(3, {});
observerResult.observe(6, {});
});
const recorder: metrics.Metric<metrics.BoundValueRecorder> &
api.ValueRecorder = mockValueRecorder();
metrics[0].aggregator.update(1);
counter.add(1);
recorder.record(7);
recorder.record(14);
metrics[1].aggregator.update(3);
metrics[1].aggregator.update(6);
metrics[2].aggregator.update(7);
metrics[2].aggregator.update(14);
metrics.push((await counter.getMetricRecord())[0]);
metrics.push((await observer.getMetricRecord())[0]);
metrics.push((await recorder.getMetricRecord())[0]);
});
afterEach(() => {

View File

@ -14,15 +14,15 @@
* limitations under the License.
*/
import { TraceFlags, ValueType, StatusCode } from '@opentelemetry/api';
import * as api from '@opentelemetry/api';
import * as metrics from '@opentelemetry/metrics';
import { ReadableSpan } from '@opentelemetry/tracing';
import { Resource } from '@opentelemetry/resources';
import { collectorTypes } from '@opentelemetry/exporter-collector';
import * as assert from 'assert';
import { MetricRecord, MeterProvider } from '@opentelemetry/metrics';
import * as grpc from 'grpc';
const meterProvider = new MeterProvider({
const meterProvider = new metrics.MeterProvider({
interval: 30000,
resource: new Resource({
service: 'ui',
@ -54,61 +54,52 @@ const traceIdArr = [
const spanIdArr = [94, 16, 114, 97, 246, 79, 165, 62];
const parentIdArr = [120, 168, 145, 80, 152, 134, 67, 136];
export async function mockCounter(): Promise<MetricRecord> {
export function mockCounter(): metrics.Metric<metrics.BoundCounter> &
api.Counter {
const name = 'int-counter';
const metric =
meter['_metrics'].get(name) ||
meter.createCounter(name, {
description: 'sample counter description',
valueType: ValueType.INT,
valueType: api.ValueType.INT,
});
metric.clear();
metric.bind({});
return (await metric.getMetricRecord())[0];
return metric;
}
export async function mockDoubleCounter(): Promise<MetricRecord> {
const name = 'double-counter';
const metric =
meter['_metrics'].get(name) ||
meter.createCounter(name, {
description: 'sample counter description',
valueType: ValueType.DOUBLE,
});
metric.clear();
metric.bind({});
return (await metric.getMetricRecord())[0];
}
export async function mockObserver(): Promise<MetricRecord> {
export function mockObserver(
callback: (observerResult: api.ObserverResult) => void
): metrics.Metric<metrics.BoundCounter> & api.ValueObserver {
const name = 'double-observer';
const metric =
meter['_metrics'].get(name) ||
meter.createValueObserver(name, {
description: 'sample observer description',
valueType: ValueType.DOUBLE,
});
meter.createValueObserver(
name,
{
description: 'sample observer description',
valueType: api.ValueType.DOUBLE,
},
callback
);
metric.clear();
metric.bind({});
return (await metric.getMetricRecord())[0];
return metric;
}
export async function mockValueRecorder(): Promise<MetricRecord> {
export function mockValueRecorder(): metrics.Metric<metrics.BoundValueRecorder> &
api.ValueRecorder {
const name = 'int-recorder';
const metric =
meter['_metrics'].get(name) ||
meter.createValueRecorder(name, {
description: 'sample recorder description',
valueType: ValueType.INT,
valueType: api.ValueType.INT,
boundaries: [0, 100],
});
metric.clear();
metric.bind({});
return (await metric.getMetricRecord())[0];
return metric;
}
export const mockedReadableSpan: ReadableSpan = {
@ -117,13 +108,13 @@ export const mockedReadableSpan: ReadableSpan = {
spanContext: {
traceId: '1f1008dc8e270e85c40a0d7c3939b278',
spanId: '5e107261f64fa53e',
traceFlags: TraceFlags.SAMPLED,
traceFlags: api.TraceFlags.SAMPLED,
},
parentSpanId: '78a8915098864388',
startTime: [1574120165, 429803070],
endTime: [1574120165, 438688070],
ended: true,
status: { code: StatusCode.OK },
status: { code: api.StatusCode.OK },
attributes: { component: 'document-load' },
links: [
{

View File

@ -14,6 +14,8 @@
* limitations under the License.
*/
import * as api from '@opentelemetry/api';
import * as metrics from '@opentelemetry/metrics';
import { collectorTypes } from '@opentelemetry/exporter-collector';
import * as core from '@opentelemetry/core';
import * as http from 'http';
@ -32,7 +34,6 @@ import {
ensureExportedValueRecorderIsCorrect,
MockedResponse,
} from './helper';
import { MetricRecord } from '@opentelemetry/metrics';
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
import { CollectorExporterError } from '@opentelemetry/exporter-collector/build/src/types';
@ -50,7 +51,7 @@ describe('CollectorMetricExporter - node with proto over http', () => {
let collectorExporterConfig: collectorTypes.CollectorExporterConfigBase;
let spyRequest: sinon.SinonSpy;
let spyWrite: sinon.SinonSpy;
let metrics: MetricRecord[];
let metrics: metrics.MetricRecord[];
describe('export', () => {
beforeEach(async () => {
spyRequest = sinon.stub(http, 'request').returns(fakeRequest as any);
@ -71,14 +72,23 @@ describe('CollectorMetricExporter - node with proto over http', () => {
value: 1592602232694000000,
});
metrics = [];
metrics.push(await mockCounter());
metrics.push(await mockObserver());
metrics.push(await mockValueRecorder());
metrics[0].aggregator.update(1);
metrics[1].aggregator.update(3);
metrics[1].aggregator.update(6);
metrics[2].aggregator.update(7);
metrics[2].aggregator.update(14);
const counter: metrics.Metric<metrics.BoundCounter> &
api.Counter = mockCounter();
const observer: metrics.Metric<metrics.BoundObserver> &
api.ValueObserver = mockObserver(observerResult => {
observerResult.observe(3, {});
observerResult.observe(6, {});
});
const recorder: metrics.Metric<metrics.BoundValueRecorder> &
api.ValueRecorder = mockValueRecorder();
counter.add(1);
recorder.record(7);
recorder.record(14);
metrics.push((await counter.getMetricRecord())[0]);
metrics.push((await observer.getMetricRecord())[0]);
metrics.push((await recorder.getMetricRecord())[0]);
});
afterEach(() => {
spyRequest.restore();

View File

@ -14,16 +14,16 @@
* limitations under the License.
*/
import { TraceFlags, ValueType, StatusCode } from '@opentelemetry/api';
import * as api from '@opentelemetry/api';
import * as metrics from '@opentelemetry/metrics';
import { hexToBase64 } from '@opentelemetry/core';
import { ReadableSpan } from '@opentelemetry/tracing';
import { Resource } from '@opentelemetry/resources';
import { collectorTypes } from '@opentelemetry/exporter-collector';
import * as assert from 'assert';
import { MeterProvider, MetricRecord } from '@opentelemetry/metrics';
import { Stream } from 'stream';
const meterProvider = new MeterProvider({
const meterProvider = new metrics.MeterProvider({
interval: 30000,
resource: new Resource({
service: 'ui',
@ -34,61 +34,52 @@ const meterProvider = new MeterProvider({
const meter = meterProvider.getMeter('default', '0.0.1');
export async function mockCounter(): Promise<MetricRecord> {
export function mockCounter(): metrics.Metric<metrics.BoundCounter> &
api.Counter {
const name = 'int-counter';
const metric =
meter['_metrics'].get(name) ||
meter.createCounter(name, {
description: 'sample counter description',
valueType: ValueType.INT,
valueType: api.ValueType.INT,
});
metric.clear();
metric.bind({});
return (await metric.getMetricRecord())[0];
return metric;
}
export async function mockDoubleCounter(): Promise<MetricRecord> {
const name = 'double-counter';
const metric =
meter['_metrics'].get(name) ||
meter.createCounter(name, {
description: 'sample counter description',
valueType: ValueType.DOUBLE,
});
metric.clear();
metric.bind({});
return (await metric.getMetricRecord())[0];
}
export async function mockObserver(): Promise<MetricRecord> {
export function mockObserver(
callback: (observerResult: api.ObserverResult) => void
): metrics.Metric<metrics.BoundCounter> & api.ValueObserver {
const name = 'double-observer';
const metric =
meter['_metrics'].get(name) ||
meter.createValueObserver(name, {
description: 'sample observer description',
valueType: ValueType.DOUBLE,
});
meter.createValueObserver(
name,
{
description: 'sample observer description',
valueType: api.ValueType.DOUBLE,
},
callback
);
metric.clear();
metric.bind({});
return (await metric.getMetricRecord())[0];
return metric;
}
export async function mockValueRecorder(): Promise<MetricRecord> {
export function mockValueRecorder(): metrics.Metric<metrics.BoundValueRecorder> &
api.ValueRecorder {
const name = 'int-recorder';
const metric =
meter['_metrics'].get(name) ||
meter.createValueRecorder(name, {
description: 'sample recorder description',
valueType: ValueType.INT,
valueType: api.ValueType.INT,
boundaries: [0, 100],
});
metric.clear();
metric.bind({});
return (await metric.getMetricRecord())[0];
return metric;
}
const traceIdHex = '1f1008dc8e270e85c40a0d7c3939b278';
@ -101,13 +92,13 @@ export const mockedReadableSpan: ReadableSpan = {
spanContext: {
traceId: traceIdHex,
spanId: spanIdHex,
traceFlags: TraceFlags.SAMPLED,
traceFlags: api.TraceFlags.SAMPLED,
},
parentSpanId: parentIdHex,
startTime: [1574120165, 429803070],
endTime: [1574120165, 438688070],
ended: true,
status: { code: StatusCode.OK },
status: { code: api.StatusCode.OK },
attributes: { component: 'document-load' },
links: [
{

View File

@ -136,50 +136,42 @@ export function toCollectorMetric(
unit: metric.descriptor.unit,
};
switch (metric.aggregator.kind) {
case AggregatorKind.SUM:
{
const result = {
dataPoints: [toDataPoint(metric, startTime)],
isMonotonic:
metric.descriptor.metricKind === MetricKind.COUNTER ||
metric.descriptor.metricKind === MetricKind.SUM_OBSERVER,
aggregationTemporality: toAggregationTemporality(metric),
};
if (metric.descriptor.valueType === api.ValueType.INT) {
metricCollector.intSum = result;
} else {
metricCollector.doubleSum = result;
}
}
break;
case AggregatorKind.LAST_VALUE:
{
const result = {
dataPoints: [toDataPoint(metric, startTime)],
};
if (metric.descriptor.valueType === api.ValueType.INT) {
metricCollector.intGauge = result;
} else {
metricCollector.doubleGauge = result;
}
}
break;
case AggregatorKind.HISTOGRAM:
{
const result = {
dataPoints: [toHistogramPoint(metric, startTime)],
aggregationTemporality: toAggregationTemporality(metric),
};
if (metric.descriptor.valueType === api.ValueType.INT) {
metricCollector.intHistogram = result;
} else {
metricCollector.doubleHistogram = result;
}
}
break;
if (
metric.aggregator.kind === AggregatorKind.SUM ||
metric.descriptor.metricKind === MetricKind.SUM_OBSERVER ||
metric.descriptor.metricKind === MetricKind.UP_DOWN_SUM_OBSERVER
) {
const result = {
dataPoints: [toDataPoint(metric, startTime)],
isMonotonic:
metric.descriptor.metricKind === MetricKind.COUNTER ||
metric.descriptor.metricKind === MetricKind.SUM_OBSERVER,
aggregationTemporality: toAggregationTemporality(metric),
};
if (metric.descriptor.valueType === api.ValueType.INT) {
metricCollector.intSum = result;
} else {
metricCollector.doubleSum = result;
}
} else if (metric.aggregator.kind === AggregatorKind.LAST_VALUE) {
const result = {
dataPoints: [toDataPoint(metric, startTime)],
};
if (metric.descriptor.valueType === api.ValueType.INT) {
metricCollector.intGauge = result;
} else {
metricCollector.doubleGauge = result;
}
} else if (metric.aggregator.kind === AggregatorKind.HISTOGRAM) {
const result = {
dataPoints: [toHistogramPoint(metric, startTime)],
aggregationTemporality: toAggregationTemporality(metric),
};
if (metric.descriptor.valueType === api.ValueType.INT) {
metricCollector.intHistogram = result;
} else {
metricCollector.doubleHistogram = result;
}
}
return metricCollector;

View File

@ -14,13 +14,20 @@
* limitations under the License.
*/
import * as api from '@opentelemetry/api';
import { ExportResultCode, NoopLogger } from '@opentelemetry/core';
import * as assert from 'assert';
import * as sinon from 'sinon';
import { CollectorMetricExporter } from '../../src/platform/browser/index';
import { CollectorExporterConfigBase } from '../../src/types';
import * as collectorTypes from '../../src/types';
import { MetricRecord } from '@opentelemetry/metrics';
import {
BoundCounter,
BoundObserver,
BoundValueRecorder,
Metric,
MetricRecord,
} from '@opentelemetry/metrics';
import {
mockCounter,
mockObserver,
@ -48,15 +55,23 @@ describe('CollectorMetricExporter - web', () => {
spySend = sinon.stub(XMLHttpRequest.prototype, 'send');
spyBeacon = sinon.stub(navigator, 'sendBeacon');
metrics = [];
metrics.push(await mockCounter());
metrics.push(await mockObserver());
metrics.push(await mockValueRecorder());
const counter: Metric<BoundCounter> & api.Counter = mockCounter();
const observer: Metric<BoundObserver> & api.ValueObserver = mockObserver(
observerResult => {
observerResult.observe(3, {});
observerResult.observe(6, {});
},
'double-observer2'
);
const recorder: Metric<BoundValueRecorder> &
api.ValueRecorder = mockValueRecorder();
counter.add(1);
recorder.record(7);
recorder.record(14);
metrics[0].aggregator.update(1);
metrics[1].aggregator.update(3);
metrics[1].aggregator.update(6);
metrics[2].aggregator.update(7);
metrics[2].aggregator.update(14);
metrics.push((await counter.getMetricRecord())[0]);
metrics.push((await observer.getMetricRecord())[0]);
metrics.push((await recorder.getMetricRecord())[0]);
});
afterEach(() => {
@ -111,7 +126,9 @@ describe('CollectorMetricExporter - web', () => {
if (metric2) {
ensureObserverIsCorrect(
metric2,
hrTimeToNanoseconds(metrics[1].aggregator.toPoint().timestamp)
hrTimeToNanoseconds(metrics[1].aggregator.toPoint().timestamp),
6,
'double-observer2'
);
}
@ -225,7 +242,9 @@ describe('CollectorMetricExporter - web', () => {
if (metric2) {
ensureObserverIsCorrect(
metric2,
hrTimeToNanoseconds(metrics[1].aggregator.toPoint().timestamp)
hrTimeToNanoseconds(metrics[1].aggregator.toPoint().timestamp),
6,
'double-observer2'
);
}

View File

@ -14,12 +14,18 @@
* limitations under the License.
*/
import * as api from '@opentelemetry/api';
import { ExportResultCode, NoopLogger } from '@opentelemetry/core';
import * as assert from 'assert';
import * as sinon from 'sinon';
import { CollectorExporterBase } from '../../src/CollectorExporterBase';
import { CollectorExporterConfigBase } from '../../src/types';
import { MetricRecord } from '@opentelemetry/metrics';
import {
BoundCounter,
BoundObserver,
Metric,
MetricRecord,
} from '@opentelemetry/metrics';
import { mockCounter, mockObserver } from '../helper';
import * as collectorTypes from '../../src/types';
@ -63,8 +69,18 @@ describe('CollectorMetricExporter - common', () => {
};
collectorExporter = new CollectorMetricExporter(collectorExporterConfig);
metrics = [];
metrics.push(await mockCounter());
metrics.push(await mockObserver());
const counter: Metric<BoundCounter> & api.Counter = mockCounter();
const observer: Metric<BoundObserver> & api.ValueObserver = mockObserver(
observerResult => {
observerResult.observe(3, {});
observerResult.observe(6, {});
},
'double-observer3'
);
counter.add(1);
metrics.push((await counter.getMetricRecord())[0]);
metrics.push((await observer.getMetricRecord())[0]);
});
afterEach(() => {

View File

@ -28,54 +28,128 @@ import {
mockedInstrumentationLibraries,
multiResourceMetricsGet,
multiInstrumentationLibraryMetricsGet,
mockSumObserver,
mockUpDownSumObserver,
ensureSumObserverIsCorrect,
ensureUpDownSumObserverIsCorrect,
} from '../helper';
import { MetricRecord, SumAggregator } from '@opentelemetry/metrics';
import {
BoundCounter,
BoundObserver,
BoundValueRecorder,
Metric,
SumAggregator,
} from '@opentelemetry/metrics';
import { hrTimeToNanoseconds } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import * as api from '@opentelemetry/api';
describe('transformMetrics', () => {
describe('toCollectorMetric', async () => {
const counter: MetricRecord = await mockCounter();
const doubleCounter: MetricRecord = await mockDoubleCounter();
const observer: MetricRecord = await mockObserver();
const recorder: MetricRecord = await mockValueRecorder();
let counter: Metric<BoundCounter> & api.Counter;
let doubleCounter: Metric<BoundCounter> & api.Counter;
let observer: Metric<BoundObserver> & api.ValueObserver;
let sumObserver: Metric<BoundObserver> & api.SumObserver;
let upDownSumObserver: Metric<BoundObserver> & api.UpDownSumObserver;
let recorder: Metric<BoundValueRecorder> & api.ValueRecorder;
beforeEach(() => {
counter = mockCounter();
doubleCounter = mockDoubleCounter();
let count1 = 0;
let count2 = 0;
let count3 = 0;
function getValue(count: number) {
if (count % 2 == 0) {
return 3;
}
return -1;
}
observer = mockObserver(observerResult => {
count1++;
observerResult.observe(getValue(count1), {});
});
sumObserver = mockSumObserver(observerResult => {
count2++;
observerResult.observe(getValue(count2), {});
});
upDownSumObserver = mockUpDownSumObserver(observerResult => {
count3++;
observerResult.observe(getValue(count3), {});
});
recorder = mockValueRecorder();
// Counter
counter.aggregator.update(1);
counter.add(1);
// Double Counter
doubleCounter.aggregator.update(8);
// Observer
observer.aggregator.update(3);
observer.aggregator.update(6);
doubleCounter.add(8);
// ValueRecorder
recorder.aggregator.update(7);
recorder.aggregator.update(14);
recorder.record(7);
recorder.record(14);
});
it('should convert metric', () => {
it('should convert metric', async () => {
const counterMetric = (await counter.getMetricRecord())[0];
ensureCounterIsCorrect(
transform.toCollectorMetric(counter, 1592602232694000000),
hrTimeToNanoseconds(counter.aggregator.toPoint().timestamp)
);
ensureObserverIsCorrect(
transform.toCollectorMetric(observer, 1592602232694000000),
hrTimeToNanoseconds(observer.aggregator.toPoint().timestamp)
transform.toCollectorMetric(counterMetric, 1592602232694000000),
hrTimeToNanoseconds(await counterMetric.aggregator.toPoint().timestamp)
);
const doubleCounterMetric = (await doubleCounter.getMetricRecord())[0];
ensureDoubleCounterIsCorrect(
transform.toCollectorMetric(doubleCounterMetric, 1592602232694000000),
hrTimeToNanoseconds(doubleCounterMetric.aggregator.toPoint().timestamp)
);
await observer.getMetricRecord();
await observer.getMetricRecord();
const observerMetric = (await observer.getMetricRecord())[0];
ensureObserverIsCorrect(
transform.toCollectorMetric(observerMetric, 1592602232694000000),
hrTimeToNanoseconds(observerMetric.aggregator.toPoint().timestamp),
-1
);
// collect 3 times
await sumObserver.getMetricRecord();
await sumObserver.getMetricRecord();
const sumObserverMetric = (await sumObserver.getMetricRecord())[0];
ensureSumObserverIsCorrect(
transform.toCollectorMetric(sumObserverMetric, 1592602232694000000),
hrTimeToNanoseconds(sumObserverMetric.aggregator.toPoint().timestamp),
3
);
// collect 3 times
await upDownSumObserver.getMetricRecord();
await upDownSumObserver.getMetricRecord();
const upDownSumObserverMetric = (
await upDownSumObserver.getMetricRecord()
)[0];
ensureUpDownSumObserverIsCorrect(
transform.toCollectorMetric(
upDownSumObserverMetric,
1592602232694000000
),
hrTimeToNanoseconds(
upDownSumObserverMetric.aggregator.toPoint().timestamp
),
-1
);
const recorderMetric = (await recorder.getMetricRecord())[0];
ensureValueRecorderIsCorrect(
transform.toCollectorMetric(recorder, 1592602232694000000),
hrTimeToNanoseconds(recorder.aggregator.toPoint().timestamp),
transform.toCollectorMetric(recorderMetric, 1592602232694000000),
hrTimeToNanoseconds(recorderMetric.aggregator.toPoint().timestamp),
[0, 100],
[0, 2, 0]
);
ensureDoubleCounterIsCorrect(
transform.toCollectorMetric(doubleCounter, 1592602232694000000),
hrTimeToNanoseconds(doubleCounter.aggregator.toPoint().timestamp)
);
});
it('should convert metric labels value to string', () => {
@ -104,7 +178,11 @@ describe('transformMetrics', () => {
it('should group by resource', async () => {
const [resource1, resource2] = mockedResources;
const [library] = mockedInstrumentationLibraries;
const [metric1, metric2, metric3] = await multiResourceMetricsGet();
const [metric1, metric2, metric3] = multiResourceMetricsGet(
observerResult => {
observerResult.observe(1, {});
}
);
const expected = new Map([
[resource1, new Map([[library, [metric1, metric3]]])],
@ -112,7 +190,9 @@ describe('transformMetrics', () => {
]);
const result = transform.groupMetricsByResourceAndLibrary(
await multiResourceMetricsGet()
multiResourceMetricsGet(observerResult => {
observerResult.observe(1, {});
})
);
assert.deepStrictEqual(result, expected);
@ -125,7 +205,7 @@ describe('transformMetrics', () => {
metric1,
metric2,
metric3,
] = await multiInstrumentationLibraryMetricsGet();
] = multiInstrumentationLibraryMetricsGet(observerResult => {});
const expected = new Map([
[
resource,
@ -137,7 +217,7 @@ describe('transformMetrics', () => {
]);
const result = transform.groupMetricsByResourceAndLibrary(
await multiInstrumentationLibraryMetricsGet()
multiInstrumentationLibraryMetricsGet(observerResult => {})
);
assert.deepStrictEqual(result, expected);

View File

@ -14,16 +14,16 @@
* limitations under the License.
*/
import { TraceFlags, ValueType, StatusCode } from '@opentelemetry/api';
import * as api from '@opentelemetry/api';
import * as metrics from '@opentelemetry/metrics';
import { ReadableSpan } from '@opentelemetry/tracing';
import { Resource } from '@opentelemetry/resources';
import { MetricRecord, MeterProvider } from '@opentelemetry/metrics';
import { hexToBase64, InstrumentationLibrary } from '@opentelemetry/core';
import * as assert from 'assert';
import { opentelemetryProto } from '../src/types';
import * as collectorTypes from '../src/types';
const meterProvider = new MeterProvider({
const meterProvider = new metrics.MeterProvider({
interval: 30000,
resource: new Resource({
service: 'ui',
@ -42,61 +42,104 @@ if (typeof Buffer === 'undefined') {
};
}
export async function mockCounter(): Promise<MetricRecord> {
export function mockCounter(): metrics.Metric<metrics.BoundCounter> &
api.Counter {
const name = 'int-counter';
const metric =
meter['_metrics'].get(name) ||
meter.createCounter(name, {
description: 'sample counter description',
valueType: ValueType.INT,
valueType: api.ValueType.INT,
});
metric.clear();
metric.bind({});
return (await metric.getMetricRecord())[0];
return metric;
}
export async function mockDoubleCounter(): Promise<MetricRecord> {
export function mockDoubleCounter(): metrics.Metric<metrics.BoundCounter> &
api.Counter {
const name = 'double-counter';
const metric =
meter['_metrics'].get(name) ||
meter.createCounter(name, {
description: 'sample counter description',
valueType: ValueType.DOUBLE,
valueType: api.ValueType.DOUBLE,
});
metric.clear();
metric.bind({});
return (await metric.getMetricRecord())[0];
return metric;
}
export async function mockObserver(): Promise<MetricRecord> {
const name = 'double-observer';
export function mockObserver(
callback: (observerResult: api.ObserverResult) => unknown,
name = 'double-observer'
): metrics.Metric<metrics.BoundObserver> & api.ValueObserver {
const metric =
meter['_metrics'].get(name) ||
meter.createValueObserver(name, {
description: 'sample observer description',
valueType: ValueType.DOUBLE,
});
meter.createValueObserver(
name,
{
description: 'sample observer description',
valueType: api.ValueType.DOUBLE,
},
callback
);
metric.clear();
metric.bind({});
return (await metric.getMetricRecord())[0];
return metric;
}
export async function mockValueRecorder(): Promise<MetricRecord> {
export function mockSumObserver(
callback: (observerResult: api.ObserverResult) => unknown,
name = 'double-sum-observer'
): metrics.Metric<metrics.BoundObserver> & api.SumObserver {
const metric =
meter['_metrics'].get(name) ||
meter.createSumObserver(
name,
{
description: 'sample sum observer description',
valueType: api.ValueType.DOUBLE,
},
callback
);
metric.clear();
metric.bind({});
return metric;
}
export function mockUpDownSumObserver(
callback: (observerResult: api.ObserverResult) => unknown,
name = 'double-up-down-sum-observer'
): metrics.Metric<metrics.BoundObserver> & api.UpDownSumObserver {
const metric =
meter['_metrics'].get(name) ||
meter.createUpDownSumObserver(
name,
{
description: 'sample up down sum observer description',
valueType: api.ValueType.DOUBLE,
},
callback
);
metric.clear();
metric.bind({});
return metric;
}
export function mockValueRecorder(): metrics.Metric<metrics.BoundValueRecorder> &
api.ValueRecorder {
const name = 'int-recorder';
const metric =
meter['_metrics'].get(name) ||
meter.createValueRecorder(name, {
description: 'sample recorder description',
valueType: ValueType.INT,
valueType: api.ValueType.INT,
boundaries: [0, 100],
});
metric.clear();
metric.bind({});
return (await metric.getMetricRecord())[0];
return metric;
}
const traceIdHex = '1f1008dc8e270e85c40a0d7c3939b278';
@ -109,13 +152,13 @@ export const mockedReadableSpan: ReadableSpan = {
spanContext: {
traceId: '1f1008dc8e270e85c40a0d7c3939b278',
spanId: '5e107261f64fa53e',
traceFlags: TraceFlags.SAMPLED,
traceFlags: api.TraceFlags.SAMPLED,
},
parentSpanId: '78a8915098864388',
startTime: [1574120165, 429803070],
endTime: [1574120165, 438688070],
ended: true,
status: { code: StatusCode.OK },
status: { code: api.StatusCode.OK },
attributes: { component: 'document-load' },
links: [
{
@ -180,13 +223,13 @@ export const basicTrace: ReadableSpan[] = [
spanContext: {
traceId: '1f1008dc8e270e85c40a0d7c3939b278',
spanId: '5e107261f64fa53e',
traceFlags: TraceFlags.SAMPLED,
traceFlags: api.TraceFlags.SAMPLED,
},
parentSpanId: '78a8915098864388',
startTime: [1574120165, 429803070],
endTime: [1574120165, 438688070],
ended: true,
status: { code: StatusCode.OK },
status: { code: api.StatusCode.OK },
attributes: {},
links: [],
events: [],
@ -200,13 +243,13 @@ export const basicTrace: ReadableSpan[] = [
spanContext: {
traceId: '1f1008dc8e270e85c40a0d7c3939b278',
spanId: 'f64fa53e5e107261',
traceFlags: TraceFlags.SAMPLED,
traceFlags: api.TraceFlags.SAMPLED,
},
parentSpanId: '78a8915098864388',
startTime: [1575120165, 439803070],
endTime: [1575120165, 448688070],
ended: true,
status: { code: StatusCode.OK },
status: { code: api.StatusCode.OK },
attributes: {},
links: [],
events: [],
@ -220,13 +263,13 @@ export const basicTrace: ReadableSpan[] = [
spanContext: {
traceId: '1f1008dc8e270e85c40a0d7c3939b278',
spanId: '07261f64fa53e5e1',
traceFlags: TraceFlags.SAMPLED,
traceFlags: api.TraceFlags.SAMPLED,
},
parentSpanId: 'a891578098864388',
startTime: [1575120165, 439803070],
endTime: [1575120165, 448688070],
ended: true,
status: { code: StatusCode.OK },
status: { code: api.StatusCode.OK },
attributes: {},
links: [],
events: [],
@ -251,44 +294,44 @@ export const multiResourceTrace: ReadableSpan[] = [
},
];
export const multiResourceMetricsGet = async function (): Promise<
MetricRecord[]
> {
export const multiResourceMetricsGet = function (
callback: (observerResult: api.ObserverResult) => unknown
): any[] {
return [
{
...(await mockCounter()),
...mockCounter(),
resource: mockedResources[0],
instrumentationLibrary: mockedInstrumentationLibraries[0],
},
{
...(await mockObserver()),
...mockObserver(callback),
resource: mockedResources[1],
instrumentationLibrary: mockedInstrumentationLibraries[0],
},
{
...(await mockCounter()),
...mockCounter(),
resource: mockedResources[0],
instrumentationLibrary: mockedInstrumentationLibraries[0],
},
];
};
export const multiInstrumentationLibraryMetricsGet = async function (): Promise<
MetricRecord[]
> {
export const multiInstrumentationLibraryMetricsGet = function (
callback: (observerResult: api.ObserverResult) => unknown
): any[] {
return [
{
...(await mockCounter()),
...mockCounter(),
resource: mockedResources[0],
instrumentationLibrary: mockedInstrumentationLibraries[0],
},
{
...(await mockObserver()),
...mockObserver(callback),
resource: mockedResources[0],
instrumentationLibrary: mockedInstrumentationLibraries[1],
},
{
...(await mockCounter()),
...mockCounter(),
resource: mockedResources[0],
instrumentationLibrary: mockedInstrumentationLibraries[0],
},
@ -464,7 +507,7 @@ export function ensureSpanIsCorrect(
assert.strictEqual(span.droppedLinksCount, 0, 'droppedLinksCount is wrong');
assert.deepStrictEqual(
span.status,
{ code: StatusCode.OK },
{ code: api.StatusCode.OK },
'status is wrong'
);
}
@ -555,17 +598,19 @@ export function ensureDoubleCounterIsCorrect(
export function ensureObserverIsCorrect(
metric: collectorTypes.opentelemetryProto.metrics.v1.Metric,
time: number
time: number,
value: number,
name = 'double-observer'
) {
assert.deepStrictEqual(metric, {
name: 'double-observer',
name,
description: 'sample observer description',
unit: '1',
doubleGauge: {
dataPoints: [
{
labels: [],
value: 6,
value,
startTimeUnixNano: 1592602232694000000,
timeUnixNano: time,
},
@ -574,6 +619,60 @@ export function ensureObserverIsCorrect(
});
}
export function ensureSumObserverIsCorrect(
metric: collectorTypes.opentelemetryProto.metrics.v1.Metric,
time: number,
value: number,
name = 'double-sum-observer'
) {
assert.deepStrictEqual(metric, {
name,
description: 'sample sum observer description',
unit: '1',
doubleSum: {
isMonotonic: true,
dataPoints: [
{
labels: [],
value,
startTimeUnixNano: 1592602232694000000,
timeUnixNano: time,
},
],
aggregationTemporality:
collectorTypes.opentelemetryProto.metrics.v1.AggregationTemporality
.AGGREGATION_TEMPORALITY_CUMULATIVE,
},
});
}
export function ensureUpDownSumObserverIsCorrect(
metric: collectorTypes.opentelemetryProto.metrics.v1.Metric,
time: number,
value: number,
name = 'double-up-down-sum-observer'
) {
assert.deepStrictEqual(metric, {
name,
description: 'sample up down sum observer description',
unit: '1',
doubleSum: {
isMonotonic: false,
dataPoints: [
{
labels: [],
value,
startTimeUnixNano: 1592602232694000000,
timeUnixNano: time,
},
],
aggregationTemporality:
collectorTypes.opentelemetryProto.metrics.v1.AggregationTemporality
.AGGREGATION_TEMPORALITY_CUMULATIVE,
},
});
}
export function ensureValueRecorderIsCorrect(
metric: collectorTypes.opentelemetryProto.metrics.v1.Metric,
time: number,

View File

@ -14,12 +14,7 @@
* limitations under the License.
*/
import {
ConsoleLogger,
ExportResult,
ExportResultCode,
LogLevel,
} from '@opentelemetry/core';
import * as api from '@opentelemetry/api';
import * as core from '@opentelemetry/core';
import * as http from 'http';
import * as assert from 'assert';
@ -37,7 +32,13 @@ import {
ensureValueRecorderIsCorrect,
ensureObserverIsCorrect,
} from '../helper';
import { MetricRecord } from '@opentelemetry/metrics';
import {
BoundCounter,
BoundObserver,
BoundValueRecorder,
Metric,
MetricRecord,
} from '@opentelemetry/metrics';
const fakeRequest = {
end: function () {},
@ -56,7 +57,7 @@ describe('CollectorMetricExporter - node with json over http', () => {
describe('instance', () => {
it('should warn about metadata when using json', () => {
const metadata = 'foo';
const logger = new ConsoleLogger(LogLevel.DEBUG);
const logger = new core.ConsoleLogger(core.LogLevel.DEBUG);
const spyLoggerWarn = sinon.stub(logger, 'warn');
collectorExporter = new CollectorMetricExporter({
logger,
@ -89,14 +90,22 @@ describe('CollectorMetricExporter - node with json over http', () => {
value: 1592602232694000000,
});
metrics = [];
metrics.push(await mockCounter());
metrics.push(await mockObserver());
metrics.push(await mockValueRecorder());
metrics[0].aggregator.update(1);
metrics[1].aggregator.update(3);
metrics[1].aggregator.update(6);
metrics[2].aggregator.update(7);
metrics[2].aggregator.update(14);
const counter: Metric<BoundCounter> & api.Counter = mockCounter();
const observer: Metric<BoundObserver> & api.ValueObserver = mockObserver(
observerResult => {
observerResult.observe(6, {});
},
'double-observer2'
);
const recorder: Metric<BoundValueRecorder> &
api.ValueRecorder = mockValueRecorder();
counter.add(1);
recorder.record(7);
recorder.record(14);
metrics.push((await counter.getMetricRecord())[0]);
metrics.push((await observer.getMetricRecord())[0]);
metrics.push((await recorder.getMetricRecord())[0]);
});
afterEach(() => {
@ -152,7 +161,9 @@ describe('CollectorMetricExporter - node with json over http', () => {
assert.ok(typeof metric2 !== 'undefined', "observer doesn't exist");
ensureObserverIsCorrect(
metric2,
core.hrTimeToNanoseconds(metrics[1].aggregator.toPoint().timestamp)
core.hrTimeToNanoseconds(metrics[1].aggregator.toPoint().timestamp),
6,
'double-observer2'
);
assert.ok(typeof metric3 !== 'undefined', "histogram doesn't exist");
ensureValueRecorderIsCorrect(
@ -184,7 +195,7 @@ describe('CollectorMetricExporter - node with json over http', () => {
assert.strictEqual(spyLoggerError.args.length, 0);
assert.strictEqual(
responseSpy.args[0][0].code,
ExportResultCode.SUCCESS
core.ExportResultCode.SUCCESS
);
done();
});
@ -211,8 +222,8 @@ describe('CollectorMetricExporter - node with json over http', () => {
callback(mockRes);
mockRes.send('failed');
setTimeout(() => {
const result = responseSpy.args[0][0] as ExportResult;
assert.strictEqual(result.code, ExportResultCode.FAILED);
const result = responseSpy.args[0][0] as core.ExportResult;
assert.strictEqual(result.code, core.ExportResultCode.FAILED);
const error = result.error as collectorTypes.CollectorExporterError;
assert.ok(error !== undefined);
assert.strictEqual(error.code, 400);

View File

@ -14,12 +14,6 @@
* limitations under the License.
*/
import {
ConsoleLogger,
ExportResultCode,
ExportResult,
LogLevel,
} from '@opentelemetry/core';
import * as core from '@opentelemetry/core';
import { ReadableSpan } from '@opentelemetry/tracing';
import * as http from 'http';
@ -53,7 +47,7 @@ describe('CollectorTraceExporter - node with json over http', () => {
describe('instance', () => {
it('should warn about metadata when using json', () => {
const metadata = 'foo';
const logger = new ConsoleLogger(LogLevel.DEBUG);
const logger = new core.ConsoleLogger(core.LogLevel.DEBUG);
const spyLoggerWarn = sinon.stub(logger, 'warn');
collectorExporter = new CollectorTraceExporter({
logger,
@ -150,7 +144,7 @@ describe('CollectorTraceExporter - node with json over http', () => {
assert.strictEqual(spyLoggerError.args.length, 0);
assert.strictEqual(
responseSpy.args[0][0].code,
ExportResultCode.SUCCESS
core.ExportResultCode.SUCCESS
);
done();
});
@ -168,8 +162,8 @@ describe('CollectorTraceExporter - node with json over http', () => {
callback(mockResError);
mockResError.send('failed');
setTimeout(() => {
const result = responseSpy.args[0][0] as ExportResult;
assert.strictEqual(result.code, ExportResultCode.FAILED);
const result = responseSpy.args[0][0] as core.ExportResult;
assert.strictEqual(result.code, core.ExportResultCode.FAILED);
const error = result.error as collectorTypes.CollectorExporterError;
assert.ok(error !== undefined);
assert.strictEqual(error.code, 400);

View File

@ -14,15 +14,15 @@
* limitations under the License.
*/
import {
Batcher,
MetricDescriptor,
Aggregator,
MetricDescriptor,
MetricRecord,
Processor,
} from '@opentelemetry/metrics';
type Constructor<T, R extends Aggregator> = new (...args: T[]) => R;
export class ExactBatcher<T, R extends Aggregator> extends Batcher {
export class ExactProcessor<T, R extends Aggregator> extends Processor {
private readonly args: ConstructorParameters<Constructor<T, R>>;
public aggregators: R[] = [];

View File

@ -202,13 +202,13 @@ describe('PrometheusExporter', () => {
const boundCounter = counter.bind({ key1: 'labelValue1' });
boundCounter.add(10);
meter.collect().then(() => {
exporter.export(meter.getBatcher().checkPointSet(), () => {
exporter.export(meter.getProcessor().checkPointSet(), () => {
// TODO: Remove this special case once the PR is ready.
// This is to test the special case where counters are destroyed
// and recreated in the exporter in order to get around prom-client's
// aggregation and use ours.
boundCounter.add(10);
exporter.export(meter.getBatcher().checkPointSet(), () => {
exporter.export(meter.getProcessor().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
@ -255,8 +255,8 @@ describe('PrometheusExporter', () => {
);
meter.collect().then(() => {
exporter.export(meter.getBatcher().checkPointSet(), () => {
exporter.export(meter.getBatcher().checkPointSet(), () => {
exporter.export(meter.getProcessor().checkPointSet(), () => {
exporter.export(meter.getProcessor().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
@ -286,7 +286,7 @@ describe('PrometheusExporter', () => {
counter.bind({ counterKey1: 'labelValue1' }).add(10);
counter.bind({ counterKey1: 'labelValue2' }).add(20);
meter.collect().then(() => {
exporter.export(meter.getBatcher().checkPointSet(), () => {
exporter.export(meter.getProcessor().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
@ -363,7 +363,7 @@ describe('PrometheusExporter', () => {
const boundCounter = counter.bind({ key1: 'labelValue1' });
boundCounter.add(10);
meter.collect().then(() => {
exporter.export(meter.getBatcher().checkPointSet(), () => {
exporter.export(meter.getProcessor().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
@ -390,7 +390,7 @@ describe('PrometheusExporter', () => {
const boundCounter = counter.bind({ key1: 'labelValue1' });
boundCounter.add(10);
meter.collect().then(() => {
exporter.export(meter.getBatcher().checkPointSet(), () => {
exporter.export(meter.getProcessor().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
@ -419,7 +419,7 @@ describe('PrometheusExporter', () => {
counter.bind({ key1: 'labelValue1' }).add(20);
meter.collect().then(() => {
exporter.export(meter.getBatcher().checkPointSet(), () => {
exporter.export(meter.getProcessor().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
@ -456,7 +456,7 @@ describe('PrometheusExporter', () => {
);
meter.collect().then(() => {
exporter.export(meter.getBatcher().checkPointSet(), () => {
exporter.export(meter.getProcessor().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
@ -465,7 +465,7 @@ describe('PrometheusExporter', () => {
assert.deepStrictEqual(lines, [
'# HELP sum_observer a test description',
'# TYPE sum_observer counter',
'# TYPE sum_observer gauge',
`sum_observer{key1="labelValue1"} 20 ${mockedHrTimeMs}`,
'',
]);
@ -496,7 +496,7 @@ describe('PrometheusExporter', () => {
);
meter.collect().then(() => {
exporter.export(meter.getBatcher().checkPointSet(), () => {
exporter.export(meter.getProcessor().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
@ -526,7 +526,7 @@ describe('PrometheusExporter', () => {
valueRecorder.bind({ key1: 'labelValue1' }).record(20);
meter.collect().then(() => {
exporter.export(meter.getBatcher().checkPointSet(), () => {
exporter.export(meter.getProcessor().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
@ -578,7 +578,7 @@ describe('PrometheusExporter', () => {
},
async () => {
await meter.collect();
exporter!.export(meter.getBatcher().checkPointSet(), () => {
exporter!.export(meter.getProcessor().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
@ -608,7 +608,7 @@ describe('PrometheusExporter', () => {
},
async () => {
await meter.collect();
exporter!.export(meter.getBatcher().checkPointSet(), () => {
exporter!.export(meter.getProcessor().checkPointSet(), () => {
http
.get('http://localhost:8080/metrics', res => {
res.on('data', chunk => {
@ -638,7 +638,7 @@ describe('PrometheusExporter', () => {
},
async () => {
await meter.collect();
exporter!.export(meter.getBatcher().checkPointSet(), () => {
exporter!.export(meter.getProcessor().checkPointSet(), () => {
http
.get('http://localhost:9464/test', res => {
res.on('data', chunk => {

View File

@ -27,7 +27,7 @@ import * as assert from 'assert';
import { Labels } from '@opentelemetry/api';
import { PrometheusSerializer } from '../src/PrometheusSerializer';
import { PrometheusLabelsBatcher } from '../src/PrometheusLabelsBatcher';
import { ExactBatcher } from './ExactBatcher';
import { ExactProcessor } from './ExactProcessor';
import { mockedHrTimeMs, mockAggregator } from './util';
const labels = {
@ -51,7 +51,7 @@ describe('PrometheusSerializer', () => {
const serializer = new PrometheusSerializer();
const meter = new MeterProvider({
batcher: new ExactBatcher(SumAggregator),
processor: new ExactProcessor(SumAggregator),
}).getMeter('test');
const counter = meter.createCounter('test') as CounterMetric;
counter.bind(labels).add(1);
@ -73,7 +73,7 @@ describe('PrometheusSerializer', () => {
const serializer = new PrometheusSerializer(undefined, false);
const meter = new MeterProvider({
batcher: new ExactBatcher(SumAggregator),
processor: new ExactProcessor(SumAggregator),
}).getMeter('test');
const counter = meter.createCounter('test') as CounterMetric;
counter.bind(labels).add(1);
@ -96,7 +96,7 @@ describe('PrometheusSerializer', () => {
const serializer = new PrometheusSerializer();
const meter = new MeterProvider({
batcher: new ExactBatcher(LastValueAggregator),
processor: new ExactProcessor(LastValueAggregator),
}).getMeter('test');
const observer = meter.createValueObserver(
'test',
@ -123,7 +123,7 @@ describe('PrometheusSerializer', () => {
const serializer = new PrometheusSerializer(undefined, false);
const meter = new MeterProvider({
batcher: new ExactBatcher(LastValueAggregator),
processor: new ExactProcessor(LastValueAggregator),
}).getMeter('test');
const observer = meter.createValueObserver(
'test',
@ -150,8 +150,8 @@ describe('PrometheusSerializer', () => {
it('should serialize metric record with sum aggregator', async () => {
const serializer = new PrometheusSerializer();
const batcher = new ExactBatcher(HistogramAggregator, [1, 10, 100]);
const meter = new MeterProvider({ batcher }).getMeter('test');
const processor = new ExactProcessor(HistogramAggregator, [1, 10, 100]);
const meter = new MeterProvider({ processor }).getMeter('test');
const recorder = meter.createValueRecorder('test', {
description: 'foobar',
}) as ValueRecorderMetric;
@ -206,8 +206,8 @@ describe('PrometheusSerializer', () => {
it('serialize metric record with sum aggregator without timestamp', async () => {
const serializer = new PrometheusSerializer(undefined, false);
const batcher = new ExactBatcher(HistogramAggregator, [1, 10, 100]);
const meter = new MeterProvider({ batcher }).getMeter('test');
const processor = new ExactProcessor(HistogramAggregator, [1, 10, 100]);
const meter = new MeterProvider({ processor }).getMeter('test');
const recorder = meter.createValueRecorder('test', {
description: 'foobar',
}) as ValueRecorderMetric;
@ -241,9 +241,9 @@ describe('PrometheusSerializer', () => {
const serializer = new PrometheusSerializer();
const meter = new MeterProvider({
batcher: new ExactBatcher(SumAggregator),
processor: new ExactProcessor(SumAggregator),
}).getMeter('test');
const batcher = new PrometheusLabelsBatcher();
const processor = new PrometheusLabelsBatcher();
const counter = meter.createCounter('test', {
description: 'foobar',
}) as CounterMetric;
@ -251,8 +251,8 @@ describe('PrometheusSerializer', () => {
counter.bind({ val: '2' }).add(1);
const records = await counter.getMetricRecord();
records.forEach(it => batcher.process(it));
const checkPointSet = batcher.checkPointSet();
records.forEach(it => processor.process(it));
const checkPointSet = processor.checkPointSet();
const result = serializer.serialize(checkPointSet);
assert.strictEqual(
@ -268,9 +268,9 @@ describe('PrometheusSerializer', () => {
const serializer = new PrometheusSerializer(undefined, false);
const meter = new MeterProvider({
batcher: new ExactBatcher(SumAggregator),
processor: new ExactProcessor(SumAggregator),
}).getMeter('test');
const batcher = new PrometheusLabelsBatcher();
const processor = new PrometheusLabelsBatcher();
const counter = meter.createCounter('test', {
description: 'foobar',
}) as CounterMetric;
@ -278,8 +278,8 @@ describe('PrometheusSerializer', () => {
counter.bind({ val: '2' }).add(1);
const records = await counter.getMetricRecord();
records.forEach(it => batcher.process(it));
const checkPointSet = batcher.checkPointSet();
records.forEach(it => processor.process(it));
const checkPointSet = processor.checkPointSet();
const result = serializer.serialize(checkPointSet);
assert.strictEqual(
@ -299,9 +299,9 @@ describe('PrometheusSerializer', () => {
const serializer = new PrometheusSerializer();
const meter = new MeterProvider({
batcher: new ExactBatcher(LastValueAggregator),
processor: new ExactProcessor(LastValueAggregator),
}).getMeter('test');
const batcher = new PrometheusLabelsBatcher();
const processor = new PrometheusLabelsBatcher();
const observer = meter.createValueObserver(
'test',
{
@ -313,8 +313,8 @@ describe('PrometheusSerializer', () => {
) as ValueObserverMetric;
await meter.collect();
const records = await observer.getMetricRecord();
records.forEach(it => batcher.process(it));
const checkPointSet = batcher.checkPointSet();
records.forEach(it => processor.process(it));
const checkPointSet = processor.checkPointSet();
const result = serializer.serialize(checkPointSet);
assert.strictEqual(
@ -332,8 +332,8 @@ describe('PrometheusSerializer', () => {
it('serialize metric record with HistogramAggregator aggregator, cumulative', async () => {
const serializer = new PrometheusSerializer();
const batcher = new ExactBatcher(HistogramAggregator, [1, 10, 100]);
const meter = new MeterProvider({ batcher }).getMeter('test');
const processor = new ExactProcessor(HistogramAggregator, [1, 10, 100]);
const meter = new MeterProvider({ processor }).getMeter('test');
const recorder = meter.createValueRecorder('test', {
description: 'foobar',
}) as ValueRecorderMetric;
@ -378,7 +378,7 @@ describe('PrometheusSerializer', () => {
const serializer = new PrometheusSerializer();
const meter = new MeterProvider({
batcher: new ExactBatcher(SumAggregator),
processor: new ExactProcessor(SumAggregator),
}).getMeter('test');
const counter = meter.createCounter('test') as CounterMetric;
counter.bind({}).add(1);
@ -397,7 +397,7 @@ describe('PrometheusSerializer', () => {
const serializer = new PrometheusSerializer();
const meter = new MeterProvider({
batcher: new ExactBatcher(SumAggregator),
processor: new ExactProcessor(SumAggregator),
}).getMeter('test');
const counter = meter.createCounter('test') as CounterMetric;
counter
@ -431,7 +431,7 @@ describe('PrometheusSerializer', () => {
for (const esac of cases) {
const meter = new MeterProvider({
batcher: new ExactBatcher(SumAggregator),
processor: new ExactProcessor(SumAggregator),
}).getMeter('test');
const counter = meter.createUpDownCounter(
'test'
@ -455,7 +455,7 @@ describe('PrometheusSerializer', () => {
const serializer = new PrometheusSerializer();
const meter = new MeterProvider({
batcher: new ExactBatcher(SumAggregator),
processor: new ExactProcessor(SumAggregator),
}).getMeter('test');
const counter = meter.createCounter('test') as CounterMetric;
counter

View File

@ -17,7 +17,7 @@ import * as api from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { BoundObserver } from './BoundInstrument';
import { Batcher } from './export/Batcher';
import { Processor } from './export/Processor';
import { MetricKind, MetricRecord } from './export/types';
import { Metric } from './Metric';
import { ObserverResult } from './ObserverResult';
@ -36,7 +36,7 @@ export abstract class BaseObserverMetric
constructor(
name: string,
options: api.MetricOptions,
private readonly _batcher: Batcher,
private readonly _processor: Processor,
resource: Resource,
metricKind: MetricKind,
instrumentationLibrary: InstrumentationLibrary,
@ -52,22 +52,24 @@ export abstract class BaseObserverMetric
this._disabled,
this._valueType,
this._logger,
this._batcher.aggregatorFor(this._descriptor)
this._processor.aggregatorFor(this._descriptor)
);
}
protected createObserverResult(): ObserverResult {
return new ObserverResult();
async getMetricRecord(): Promise<MetricRecord[]> {
const observerResult = new ObserverResult();
await this._callback(observerResult);
this._processResults(observerResult);
return super.getMetricRecord();
}
async getMetricRecord(): Promise<MetricRecord[]> {
const observerResult = this.createObserverResult();
await this._callback(observerResult);
protected _processResults(observerResult: ObserverResult) {
observerResult.values.forEach((value, labels) => {
const instrument = this.bind(labels);
instrument.update(value);
});
return super.getMetricRecord();
}
observation(value: number) {

View File

@ -19,7 +19,7 @@ import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { BatchObserverResult } from './BatchObserverResult';
import { BoundObserver } from './BoundInstrument';
import { Batcher } from './export/Batcher';
import { Processor } from './export/Processor';
import { MetricKind, MetricRecord } from './export/types';
import { Metric } from './Metric';
@ -36,7 +36,7 @@ export class BatchObserverMetric
constructor(
name: string,
options: api.BatchMetricOptions,
private readonly _batcher: Batcher,
private readonly _processor: Processor,
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
callback?: (observerResult: api.BatchObserverResult) => void
@ -59,7 +59,7 @@ export class BatchObserverMetric
this._disabled,
this._valueType,
this._logger,
this._batcher.aggregatorFor(this._descriptor)
this._processor.aggregatorFor(this._descriptor)
);
}

View File

@ -18,7 +18,7 @@ import * as api from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { BoundCounter } from './BoundInstrument';
import { Batcher } from './export/Batcher';
import { Processor } from './export/Processor';
import { MetricKind } from './export/types';
import { Metric } from './Metric';
@ -27,7 +27,7 @@ export class CounterMetric extends Metric<BoundCounter> implements api.Counter {
constructor(
name: string,
options: api.MetricOptions,
private readonly _batcher: Batcher,
private readonly _processor: Processor,
resource: Resource,
instrumentationLibrary: InstrumentationLibrary
) {
@ -39,8 +39,7 @@ export class CounterMetric extends Metric<BoundCounter> implements api.Counter {
this._disabled,
this._valueType,
this._logger,
// @todo: consider to set to CounterSumAggregator always.
this._batcher.aggregatorFor(this._descriptor)
this._processor.aggregatorFor(this._descriptor)
);
}

View File

@ -19,6 +19,7 @@ import { ConsoleLogger, InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { BatchObserverMetric } from './BatchObserverMetric';
import { BaseBoundInstrument } from './BoundInstrument';
import { Processor } from './export/Processor';
import { MetricKind } from './export/types';
import { UpDownCounterMetric } from './UpDownCounterMetric';
import { CounterMetric } from './CounterMetric';
@ -28,7 +29,7 @@ import { Metric } from './Metric';
import { ValueObserverMetric } from './ValueObserverMetric';
import { SumObserverMetric } from './SumObserverMetric';
import { DEFAULT_METRIC_OPTIONS, DEFAULT_CONFIG, MeterConfig } from './types';
import { Batcher, UngroupedBatcher } from './export/Batcher';
import { UngroupedProcessor } from './export/Processor';
import { PushController } from './export/Controller';
import { NoopExporter } from './export/NoopExporter';
@ -38,7 +39,7 @@ import { NoopExporter } from './export/NoopExporter';
export class Meter implements api.Meter {
private readonly _logger: api.Logger;
private readonly _metrics = new Map<string, Metric<BaseBoundInstrument>>();
private readonly _batcher: Batcher;
private readonly _processor: Processor;
private readonly _resource: Resource;
private readonly _instrumentationLibrary: InstrumentationLibrary;
private readonly _controller: PushController;
@ -53,7 +54,7 @@ export class Meter implements api.Meter {
config: MeterConfig = DEFAULT_CONFIG
) {
this._logger = config.logger || new ConsoleLogger(config.logLevel);
this._batcher = config.batcher ?? new UngroupedBatcher();
this._processor = config.processor ?? new UngroupedProcessor();
this._resource = config.resource || Resource.createTelemetrySDKResource();
this._instrumentationLibrary = instrumentationLibrary;
// start the push controller
@ -86,7 +87,7 @@ export class Meter implements api.Meter {
const valueRecorder = new ValueRecorderMetric(
name,
opt,
this._batcher,
this._processor,
this._resource,
this._instrumentationLibrary
);
@ -116,7 +117,7 @@ export class Meter implements api.Meter {
const counter = new CounterMetric(
name,
opt,
this._batcher,
this._processor,
this._resource,
this._instrumentationLibrary
);
@ -152,7 +153,7 @@ export class Meter implements api.Meter {
const upDownCounter = new UpDownCounterMetric(
name,
opt,
this._batcher,
this._processor,
this._resource,
this._instrumentationLibrary
);
@ -185,7 +186,7 @@ export class Meter implements api.Meter {
const valueObserver = new ValueObserverMetric(
name,
opt,
this._batcher,
this._processor,
this._resource,
this._instrumentationLibrary,
callback
@ -213,7 +214,7 @@ export class Meter implements api.Meter {
const sumObserver = new SumObserverMetric(
name,
opt,
this._batcher,
this._processor,
this._resource,
this._instrumentationLibrary,
callback
@ -247,7 +248,7 @@ export class Meter implements api.Meter {
const upDownSumObserver = new UpDownSumObserverMetric(
name,
opt,
this._batcher,
this._processor,
this._resource,
this._instrumentationLibrary,
callback
@ -281,7 +282,7 @@ export class Meter implements api.Meter {
const batchObserver = new BatchObserverMetric(
name,
opt,
this._batcher,
this._processor,
this._resource,
this._instrumentationLibrary,
callback
@ -293,7 +294,7 @@ export class Meter implements api.Meter {
/**
* Collects all the metrics created with this `Meter` for export.
*
* Utilizes the batcher to create checkpoints of the current values in
* Utilizes the processor to create checkpoints of the current values in
* each aggregator belonging to the metrics that were created with this
* meter instance.
*/
@ -308,7 +309,7 @@ export class Meter implements api.Meter {
});
await Promise.all(batchObservers).then(records => {
records.forEach(metrics => {
metrics.forEach(metric => this._batcher.process(metric));
metrics.forEach(metric => this._processor.process(metric));
});
});
@ -323,13 +324,13 @@ export class Meter implements api.Meter {
await Promise.all(metrics).then(records => {
records.forEach(metrics => {
metrics.forEach(metric => this._batcher.process(metric));
metrics.forEach(metric => this._processor.process(metric));
});
});
}
getBatcher(): Batcher {
return this._batcher;
getProcessor(): Processor {
return this._processor;
}
shutdown(): Promise<void> {

View File

@ -1,25 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Labels } from '@opentelemetry/api';
import { ObserverResult } from './ObserverResult';
export class MonotonicObserverResult extends ObserverResult {
observe(value: number, labels: Labels): void {
if (value >= 0) {
this.values.set(labels, value);
}
}
}

View File

@ -18,10 +18,9 @@ import * as api from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { BaseObserverMetric } from './BaseObserverMetric';
import { Processor } from './export/Processor';
import { LastValue, MetricKind } from './export/types';
import { ObserverResult } from './ObserverResult';
import { MonotonicObserverResult } from './MonotonicObserverResult';
import { Batcher } from './export/Batcher';
import { MetricKind } from './export/types';
/** This is a SDK implementation of SumObserver Metric. */
export class SumObserverMetric
@ -30,7 +29,7 @@ export class SumObserverMetric
constructor(
name: string,
options: api.MetricOptions,
batcher: Batcher,
processor: Processor,
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
callback?: (observerResult: api.ObserverResult) => unknown
@ -38,7 +37,7 @@ export class SumObserverMetric
super(
name,
options,
batcher,
processor,
resource,
MetricKind.SUM_OBSERVER,
instrumentationLibrary,
@ -46,7 +45,19 @@ export class SumObserverMetric
);
}
protected createObserverResult(): ObserverResult {
return new MonotonicObserverResult();
protected _processResults(observerResult: ObserverResult) {
observerResult.values.forEach((value, labels) => {
const instrument = this.bind(labels);
// SumObserver is monotonic which means it should only accept values
// greater or equal then previous value
const previous = instrument.getAggregator().toPoint();
let previousValue = -Infinity;
if (previous.timestamp[0] !== 0 || previous.timestamp[1] !== 0) {
previousValue = previous.value as LastValue;
}
if (value >= previousValue) {
instrument.update(value);
}
});
}
}

View File

@ -19,7 +19,7 @@ import { Resource } from '@opentelemetry/resources';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { BoundUpDownCounter } from './BoundInstrument';
import { MetricKind } from './export/types';
import { Batcher } from './export/Batcher';
import { Processor } from './export/Processor';
import { Metric } from './Metric';
/** This is a SDK implementation of UpDownCounter Metric. */
@ -29,7 +29,7 @@ export class UpDownCounterMetric
constructor(
name: string,
options: api.MetricOptions,
private readonly _batcher: Batcher,
private readonly _processor: Processor,
resource: Resource,
instrumentationLibrary: InstrumentationLibrary
) {
@ -47,7 +47,7 @@ export class UpDownCounterMetric
this._disabled,
this._valueType,
this._logger,
this._batcher.aggregatorFor(this._descriptor)
this._processor.aggregatorFor(this._descriptor)
);
}

View File

@ -18,7 +18,7 @@ import * as api from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { BaseObserverMetric } from './BaseObserverMetric';
import { Batcher } from './export/Batcher';
import { Processor } from './export/Processor';
import { MetricKind } from './export/types';
/** This is a SDK implementation of UpDownSumObserver Metric. */
@ -28,7 +28,7 @@ export class UpDownSumObserverMetric
constructor(
name: string,
options: api.MetricOptions,
batcher: Batcher,
processor: Processor,
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
callback?: (observerResult: api.ObserverResult) => unknown
@ -36,7 +36,7 @@ export class UpDownSumObserverMetric
super(
name,
options,
batcher,
processor,
resource,
MetricKind.UP_DOWN_SUM_OBSERVER,
instrumentationLibrary,

View File

@ -17,7 +17,7 @@ import * as api from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { BaseObserverMetric } from './BaseObserverMetric';
import { Batcher } from './export/Batcher';
import { Processor } from './export/Processor';
import { MetricKind } from './export/types';
/** This is a SDK implementation of Value Observer Metric. */
@ -27,7 +27,7 @@ export class ValueObserverMetric
constructor(
name: string,
options: api.MetricOptions,
batcher: Batcher,
processor: Processor,
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
callback?: (observerResult: api.ObserverResult) => unknown
@ -35,7 +35,7 @@ export class ValueObserverMetric
super(
name,
options,
batcher,
processor,
resource,
MetricKind.VALUE_OBSERVER,
instrumentationLibrary,

View File

@ -18,7 +18,7 @@ import * as api from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { BoundValueRecorder } from './BoundInstrument';
import { Batcher } from './export/Batcher';
import { Processor } from './export/Processor';
import { MetricKind } from './export/types';
import { Metric } from './Metric';
@ -29,7 +29,7 @@ export class ValueRecorderMetric
constructor(
name: string,
options: api.MetricOptions,
private readonly _batcher: Batcher,
private readonly _processor: Processor,
resource: Resource,
instrumentationLibrary: InstrumentationLibrary
) {
@ -48,7 +48,7 @@ export class ValueRecorderMetric
this._disabled,
this._valueType,
this._logger,
this._batcher.aggregatorFor(this._descriptor)
this._processor.aggregatorFor(this._descriptor)
);
}

View File

@ -51,7 +51,7 @@ export class PushController extends Controller {
await this._meter.collect();
return new Promise(resolve => {
this._exporter.export(
this._meter.getBatcher().checkPointSet(),
this._meter.getProcessor().checkPointSet(),
result => {
if (result.code !== ExportResultCode.SUCCESS) {
globalErrorHandler(

View File

@ -23,13 +23,13 @@ import {
} from './types';
/**
* Base class for all batcher types.
* Base class for all processor types.
*
* The batcher is responsible for storing the aggregators and aggregated
* The processor is responsible for storing the aggregators and aggregated
* values received from updates from metrics in the meter. The stored values
* will be sent to an exporter for exporting.
*/
export abstract class Batcher {
export abstract class Processor {
protected readonly _batchMap = new Map<string, MetricRecord>();
/** Returns an aggregator based off metric descriptor. */
@ -44,23 +44,26 @@ export abstract class Batcher {
}
/**
* Batcher which retains all dimensions/labels. It accepts all records and
* Processor which retains all dimensions/labels. It accepts all records and
* passes them for exporting.
*/
export class UngroupedBatcher extends Batcher {
export class UngroupedProcessor extends Processor {
aggregatorFor(metricDescriptor: MetricDescriptor): Aggregator {
switch (metricDescriptor.metricKind) {
case MetricKind.COUNTER:
case MetricKind.UP_DOWN_COUNTER:
return new aggregators.SumAggregator();
case MetricKind.SUM_OBSERVER:
case MetricKind.UP_DOWN_SUM_OBSERVER:
return new aggregators.SumAggregator();
case MetricKind.VALUE_OBSERVER:
return new aggregators.LastValueAggregator();
case MetricKind.VALUE_RECORDER:
return new aggregators.HistogramAggregator(
metricDescriptor.boundaries || [Infinity]
);
case MetricKind.VALUE_OBSERVER:
return new aggregators.LastValueAggregator();
default:
return new aggregators.LastValueAggregator();
}

View File

@ -22,8 +22,8 @@ export * from './MeterProvider';
export * from './Metric';
export * from './ValueObserverMetric';
export * from './export/aggregators';
export * from './export/Batcher';
export * from './export/ConsoleMetricExporter';
export * from './export/Processor';
export * from './export/types';
export * from './UpDownCounterMetric';
export { MeterConfig } from './types';

View File

@ -16,9 +16,9 @@
import { LogLevel, getEnv } from '@opentelemetry/core';
import * as api from '@opentelemetry/api';
import { Processor } from './export/Processor';
import { MetricExporter } from './export/types';
import { Resource } from '@opentelemetry/resources';
import { Batcher } from './export/Batcher';
/** MeterConfig provides an interface for configuring a Meter. */
export interface MeterConfig {
@ -37,8 +37,8 @@ export interface MeterConfig {
/** Resource associated with metric telemetry */
resource?: Resource;
/** Metric batcher. */
batcher?: Batcher;
/** Metric Processor. */
processor?: Processor;
}
/** Default Meter configuration. */

View File

@ -41,7 +41,7 @@ import { SumObserverMetric } from '../src/SumObserverMetric';
import { Resource } from '@opentelemetry/resources';
import { UpDownSumObserverMetric } from '../src/UpDownSumObserverMetric';
import { hashLabels } from '../src/Utils';
import { Batcher } from '../src/export/Batcher';
import { Processor } from '../src/export/Processor';
import { ValueType } from '@opentelemetry/api';
const nonNumberValues = [
@ -105,7 +105,7 @@ describe('Meter', () => {
const counter = meter.createCounter('name') as CounterMetric;
counter.add(10, labels);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 10);
const lastTimestamp = record1.aggregator.toPoint().timestamp;
@ -130,7 +130,7 @@ describe('Meter', () => {
});
counter.add(1);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 1);
});
@ -162,7 +162,7 @@ describe('Meter', () => {
const boundCounter = counter.bind(labels);
boundCounter.add(10);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 10);
boundCounter.add(10);
@ -181,9 +181,9 @@ describe('Meter', () => {
const counter = meter.createCounter('name') as CounterMetric;
const boundCounter = counter.bind(labels);
boundCounter.add(10);
assert.strictEqual(meter.getBatcher().checkPointSet().length, 0);
assert.strictEqual(meter.getProcessor().checkPointSet().length, 0);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 10);
boundCounter.add(-100);
@ -197,7 +197,7 @@ describe('Meter', () => {
const boundCounter = counter.bind(labels);
boundCounter.add(10);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 0);
});
@ -208,7 +208,7 @@ describe('Meter', () => {
const boundCounter1 = counter.bind(labels);
boundCounter1.add(10);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 20);
assert.strictEqual(boundCounter, boundCounter1);
@ -253,7 +253,7 @@ describe('Meter', () => {
counter2.bind(labels).add(500);
await meter.collect();
const record = meter.getBatcher().checkPointSet();
const record = meter.getProcessor().checkPointSet();
assert.strictEqual(record.length, 1);
assert.deepStrictEqual(record[0].descriptor, {
@ -317,7 +317,7 @@ describe('Meter', () => {
const upDownCounter = meter.createUpDownCounter('name');
upDownCounter.add(10, labels);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 10);
const lastTimestamp = record1.aggregator.toPoint().timestamp;
@ -342,7 +342,7 @@ describe('Meter', () => {
});
upDownCounter.add(1);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 1);
});
@ -364,7 +364,7 @@ describe('Meter', () => {
const boundCounter = upDownCounter.bind(labels);
boundCounter.add(10);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 10);
boundCounter.add(-200);
@ -388,7 +388,7 @@ describe('Meter', () => {
const boundCounter = upDownCounter.bind(labels);
boundCounter.add(10);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 0);
});
@ -399,7 +399,7 @@ describe('Meter', () => {
const boundCounter1 = upDownCounter.bind(labels);
boundCounter1.add(10);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 20);
assert.strictEqual(boundCounter, boundCounter1);
@ -415,7 +415,7 @@ describe('Meter', () => {
boundCounter.add(val);
});
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 1);
});
@ -430,7 +430,7 @@ describe('Meter', () => {
// @ts-expect-error
boundCounter.add(val);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 0);
})
@ -448,7 +448,7 @@ describe('Meter', () => {
// @ts-expect-error
boundCounter.add(val);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.strictEqual(record1.aggregator.toPoint().value, 0);
})
@ -498,7 +498,7 @@ describe('Meter', () => {
counter2.bind(labels).add(500);
await meter.collect();
const record = meter.getBatcher().checkPointSet();
const record = meter.getProcessor().checkPointSet();
assert.strictEqual(record.length, 1);
assert.deepStrictEqual(record[0].descriptor, {
@ -570,7 +570,7 @@ describe('Meter', () => {
valueRecorder.record(200);
await meter.collect();
const [record] = meter.getBatcher().checkPointSet();
const [record] = meter.getProcessor().checkPointSet();
assert.deepStrictEqual(record.aggregator.toPoint().value as Histogram, {
buckets: {
boundaries: [10, 20, 30, 100],
@ -649,7 +649,7 @@ describe('Meter', () => {
boundValueRecorder.record(10);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.deepStrictEqual(
record1.aggregator.toPoint().value as Histogram,
{
@ -670,7 +670,7 @@ describe('Meter', () => {
boundValueRecorder.record(50);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.deepStrictEqual(
record1.aggregator.toPoint().value as Histogram,
{
@ -697,7 +697,7 @@ describe('Meter', () => {
const boundValueRecorder2 = valueRecorder.bind(labels);
boundValueRecorder2.record(100);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.deepStrictEqual(
record1.aggregator.toPoint().value as Histogram,
{
@ -723,7 +723,7 @@ describe('Meter', () => {
// @ts-expect-error
boundValueRecorder.record(val);
await meter.collect();
const [record1] = meter.getBatcher().checkPointSet();
const [record1] = meter.getProcessor().checkPointSet();
assert.deepStrictEqual(
record1.aggregator.toPoint().value as Histogram,
{
@ -801,10 +801,11 @@ describe('Meter', () => {
let counter = 0;
function getValue() {
console.log('getting value, counter:', counter);
if (++counter % 2 == 0) {
return -1;
return 3;
}
return 3;
return -1;
}
const sumObserver = meter.createSumObserver(
@ -826,7 +827,7 @@ describe('Meter', () => {
let metricRecords = await sumObserver.getMetricRecord();
assert.strictEqual(metricRecords.length, 1);
let point = metricRecords[0].aggregator.toPoint();
assert.strictEqual(point.value, 3);
assert.strictEqual(point.value, -1);
assert.strictEqual(
hashLabels(metricRecords[0].labels),
'|#core:1,pid:123'
@ -840,7 +841,7 @@ describe('Meter', () => {
metricRecords = await sumObserver.getMetricRecord();
assert.strictEqual(metricRecords.length, 1);
point = metricRecords[0].aggregator.toPoint();
assert.strictEqual(point.value, 6);
assert.strictEqual(point.value, 3);
});
it('should set callback and observe value when callback returns nothing', async () => {
@ -1024,7 +1025,7 @@ describe('Meter', () => {
function getValue() {
counter++;
if (counter % 2 === 0) {
return -1;
return 2;
}
return 3;
}
@ -1062,7 +1063,7 @@ describe('Meter', () => {
metricRecords = await upDownSumObserver.getMetricRecord();
assert.strictEqual(metricRecords.length, 1);
point = metricRecords[0].aggregator.toPoint();
assert.strictEqual(point.value, 5);
assert.strictEqual(point.value, 3);
});
it('should set callback and observe value when callback returns nothing', async () => {
@ -1215,7 +1216,7 @@ describe('Meter', () => {
);
await meter.collect();
const records = meter.getBatcher().checkPointSet();
const records = meter.getProcessor().checkPointSet();
assert.strictEqual(records.length, 8);
const metric1 = records[0];
@ -1317,7 +1318,7 @@ describe('Meter', () => {
boundCounter.add(10.45);
await meter.collect();
const record = meter.getBatcher().checkPointSet();
const record = meter.getProcessor().checkPointSet();
assert.strictEqual(record.length, 1);
assert.deepStrictEqual(record[0].descriptor, {
@ -1343,7 +1344,7 @@ describe('Meter', () => {
boundCounter.add(10.45);
await meter.collect();
const record = meter.getBatcher().checkPointSet();
const record = meter.getProcessor().checkPointSet();
assert.strictEqual(record.length, 1);
assert.deepStrictEqual(record[0].descriptor, {
@ -1359,9 +1360,9 @@ describe('Meter', () => {
});
});
it('should allow custom batcher', () => {
const customMeter = new MeterProvider().getMeter('custom-batcher', '*', {
batcher: new CustomBatcher(),
it('should allow custom processor', () => {
const customMeter = new MeterProvider().getMeter('custom-processor', '*', {
processor: new CustomProcessor(),
});
assert.throws(() => {
const valueRecorder = customMeter.createValueRecorder('myValueRecorder');
@ -1370,7 +1371,7 @@ describe('Meter', () => {
});
});
class CustomBatcher extends Batcher {
class CustomProcessor extends Processor {
process(record: MetricRecord): void {
throw new Error('process method not implemented.');
}

View File

@ -19,7 +19,7 @@ import * as api from '@opentelemetry/api';
import { NoopLogger } from '@opentelemetry/core';
import { Meter, MeterProvider } from '../src';
describe('Batcher', () => {
describe('Processor', () => {
describe('Ungrouped', () => {
let meter: Meter;
let fooCounter: api.BoundCounter;
@ -30,7 +30,7 @@ describe('Batcher', () => {
logger: new NoopLogger(),
interval: 10000,
}).getMeter('test-meter');
counter = meter.createCounter('ungrouped-batcher-test');
counter = meter.createCounter('ungrouped-processor-test');
fooCounter = counter.bind({ key: 'foo' });
barCounter = counter.bind({ key: 'bar' });
});
@ -40,7 +40,7 @@ describe('Batcher', () => {
barCounter.add(1);
barCounter.add(2);
await meter.collect();
const checkPointSet = meter.getBatcher().checkPointSet();
const checkPointSet = meter.getProcessor().checkPointSet();
assert.strictEqual(checkPointSet.length, 2);
for (const record of checkPointSet) {
switch (record.labels.key) {

View File

@ -50,7 +50,7 @@ describe('ConsoleMetricExporter', () => {
boundCounter.add(10);
await meter.collect();
consoleExporter.export(meter.getBatcher().checkPointSet(), () => {});
consoleExporter.export(meter.getProcessor().checkPointSet(), () => {});
assert.strictEqual(spyConsole.args.length, 3);
const [descriptor, labels, value] = spyConsole.args;
assert.deepStrictEqual(descriptor, [

View File

@ -100,9 +100,9 @@ Use a custom logger. Default: Logging disabled
Default: [INFO](../opentelemetry-core/src/common/types.ts#L19)
### metricBatcher
### metricProcessor
Use a custom batcher for metrics. Default: [UngroupedBatcher](../opentelemetry-metrics/src/export/Batcher.ts#L50)
Use a custom processor for metrics. Default: [UngroupedProcessor](../opentelemetry-metrics/src/export/Processor.ts#L50)
### metricExporter

View File

@ -89,8 +89,8 @@ export class NodeSDK {
if (configuration.metricExporter) {
const meterConfig: MeterConfig = {};
if (configuration.metricBatcher) {
meterConfig.batcher = configuration.metricBatcher;
if (configuration.metricProcessor) {
meterConfig.processor = configuration.metricProcessor;
}
if (configuration.metricExporter) {
meterConfig.exporter = configuration.metricExporter;

View File

@ -24,7 +24,7 @@ export interface NodeSDKConfiguration {
textMapPropagator: api.TextMapPropagator;
logger: api.Logger;
logLevel: core.LogLevel;
metricBatcher: metrics.Batcher;
metricProcessor: metrics.Processor;
metricExporter: metrics.MetricExporter;
metricInterval: number;
plugins: node.Plugins;