Jaeger no flush interval (#609)
* fix: remove flush timer from jaeger exporter * chore: remove deprecated flushInterval * feat: force flush on each export * chore: make export noop on empty span array * chore: do not flush empty batch span processor
This commit is contained in:
parent
9a91434cfc
commit
b4ab8a557c
|
|
@ -9,8 +9,6 @@ const tracer = new BasicTracer();
|
|||
const zipkinExporter = new ZipkinExporter({serviceName: 'basic-service'});
|
||||
const jaegerExporter = new JaegerExporter({
|
||||
serviceName: 'basic-service',
|
||||
// The default flush interval is 5 seconds.
|
||||
flushInterval: 2000
|
||||
});
|
||||
const collectorExporter = new CollectorExporter({serviceName: 'basic-service'});
|
||||
|
||||
|
|
|
|||
|
|
@ -27,8 +27,6 @@ function setupTracerAndExporters(service) {
|
|||
} else {
|
||||
exporter = new JaegerExporter({
|
||||
serviceName: service,
|
||||
// The default flush interval is 5 seconds.
|
||||
flushInterval: 2000
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -26,8 +26,6 @@ function setupTracerAndExporters(service) {
|
|||
} else {
|
||||
exporter = new JaegerExporter({
|
||||
serviceName: service,
|
||||
// The default flush interval is 5 seconds.
|
||||
flushInterval: 2000
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -26,8 +26,6 @@ function setupTracerAndExporters(service) {
|
|||
} else {
|
||||
exporter = new JaegerExporter({
|
||||
serviceName: service,
|
||||
// The default flush interval is 5 seconds.
|
||||
flushInterval: 2000
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,8 +18,6 @@ function setupTracerAndExporters(service) {
|
|||
} else {
|
||||
exporter = new JaegerExporter({
|
||||
serviceName: service,
|
||||
// The default flush interval is 5 seconds.
|
||||
flushInterval: 2000
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,8 +18,6 @@ function setupTracerAndExporters(service) {
|
|||
} else {
|
||||
exporter = new JaegerExporter({
|
||||
serviceName: service,
|
||||
// The default flush interval is 5 seconds.
|
||||
flushInterval: 2000
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,8 +25,6 @@ function setupTracerAndExporters(service) {
|
|||
})));
|
||||
tracer.addSpanProcessor(new SimpleSpanProcessor(new JaegerExporter({
|
||||
serviceName: service,
|
||||
// The default flush interval is 5 seconds.
|
||||
flushInterval: 2000
|
||||
})));
|
||||
|
||||
// Initialize the OpenTelemetry APIs to use the BasicTracer bindings
|
||||
|
|
|
|||
|
|
@ -18,8 +18,6 @@ function setupTracerAndExporters(service) {
|
|||
} else {
|
||||
exporter = new JaegerExporter({
|
||||
serviceName: service,
|
||||
// The default flush interval is 5 seconds.
|
||||
flushInterval: 2000
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ import * as jaegerTypes from './types';
|
|||
import { NoopLogger } from '@opentelemetry/core';
|
||||
import * as types from '@opentelemetry/types';
|
||||
import { spanToThrift } from './transform';
|
||||
import { unrefTimer } from '@opentelemetry/core';
|
||||
|
||||
/**
|
||||
* Format and sends span information to Jaeger Exporter.
|
||||
|
|
@ -29,17 +28,16 @@ export class JaegerExporter implements SpanExporter {
|
|||
private readonly _logger: types.Logger;
|
||||
private readonly _process: jaegerTypes.ThriftProcess;
|
||||
private readonly _sender: typeof jaegerTypes.UDPSender;
|
||||
private readonly _forceFlush: boolean = true;
|
||||
private readonly _flushTimeout: number;
|
||||
private _timer: NodeJS.Timeout;
|
||||
private readonly _forceFlushOnShutdown: boolean = true;
|
||||
private readonly _onShutdownFlushTimeout: number;
|
||||
|
||||
constructor(config: jaegerTypes.ExporterConfig) {
|
||||
this._logger = config.logger || new NoopLogger();
|
||||
const tags: jaegerTypes.Tag[] = config.tags || [];
|
||||
if (config.forceFlush !== undefined) {
|
||||
this._forceFlush = config.forceFlush;
|
||||
}
|
||||
this._flushTimeout = config.flushTimeout || 2000;
|
||||
this._forceFlushOnShutdown =
|
||||
typeof config.forceFlush === 'boolean' ? config.forceFlush : true;
|
||||
this._onShutdownFlushTimeout =
|
||||
typeof config.flushTimeout === 'number' ? config.flushTimeout : 2000;
|
||||
|
||||
this._sender = new jaegerTypes.UDPSender(config);
|
||||
this._process = {
|
||||
|
|
@ -47,10 +45,6 @@ export class JaegerExporter implements SpanExporter {
|
|||
tags: jaegerTypes.ThriftUtils.getThriftTags(tags),
|
||||
};
|
||||
this._sender.setProcess(this._process);
|
||||
|
||||
const flushInterval = config.flushInterval || 5000;
|
||||
this._timer = setInterval(this._flush.bind(this), flushInterval);
|
||||
unrefTimer(this._timer);
|
||||
}
|
||||
|
||||
/** Exports a list of spans to Jaeger. */
|
||||
|
|
@ -58,48 +52,70 @@ export class JaegerExporter implements SpanExporter {
|
|||
spans: ReadableSpan[],
|
||||
resultCallback: (result: ExportResult) => void
|
||||
): void {
|
||||
if (spans.length === 0) {
|
||||
return resultCallback(ExportResult.SUCCESS);
|
||||
}
|
||||
this._logger.debug('Jaeger exporter export');
|
||||
return this._sendSpans(spans, resultCallback);
|
||||
this._sendSpans(spans, resultCallback).catch(err => {
|
||||
this._logger.error(`JaegerExporter failed to export: ${err}`);
|
||||
});
|
||||
}
|
||||
|
||||
/** Shutdown exporter. */
|
||||
shutdown(): void {
|
||||
if (!this._forceFlush) return;
|
||||
if (!this._forceFlushOnShutdown) return;
|
||||
// Make an optimistic flush.
|
||||
this._flush();
|
||||
// Sleeping x seconds before closing the sender's connection to ensure
|
||||
// all spans are flushed.
|
||||
setTimeout(() => {
|
||||
this._sender.close();
|
||||
}, this._flushTimeout);
|
||||
}, this._onShutdownFlushTimeout);
|
||||
}
|
||||
|
||||
/** Transform spans and sends to Jaeger service. */
|
||||
private _sendSpans(
|
||||
private async _sendSpans(
|
||||
spans: ReadableSpan[],
|
||||
done?: (result: ExportResult) => void
|
||||
) {
|
||||
const thriftSpan = spans.map(span => spanToThrift(span));
|
||||
for (const span of thriftSpan) {
|
||||
this._sender.append(span, (numSpans: number, err?: string) => {
|
||||
if (err) {
|
||||
// @todo: decide whether to break out the loop on first error.
|
||||
this._logger.error(`failed to append span: ${err}`);
|
||||
if (done) return done(ExportResult.FAILED_NOT_RETRYABLE);
|
||||
}
|
||||
});
|
||||
try {
|
||||
await this._append(span);
|
||||
} catch (err) {
|
||||
this._logger.error(`failed to append span: ${err}`);
|
||||
// TODO right now we break out on first error, is that desirable?
|
||||
if (done) return done(ExportResult.FAILED_NOT_RETRYABLE);
|
||||
}
|
||||
}
|
||||
// @todo: We should wait for all the callbacks of the append calls to
|
||||
// complete before it calls done with success.
|
||||
this._logger.debug('successful append for : %s', thriftSpan.length);
|
||||
|
||||
// Flush all spans on each export. No-op if span buffer is empty
|
||||
await this._flush();
|
||||
|
||||
if (done) return done(ExportResult.SUCCESS);
|
||||
}
|
||||
|
||||
private _flush(): void {
|
||||
this._sender.flush((numSpans: number, err?: string) => {
|
||||
if (err) {
|
||||
this._logger.error(`failed to flush ${numSpans} spans: ${err}`);
|
||||
}
|
||||
private async _append(span: jaegerTypes.ThriftSpan): Promise<number> {
|
||||
return new Promise((resolve, reject) => {
|
||||
this._sender.append(span, (count: number, err?: string) => {
|
||||
if (err) {
|
||||
return reject(new Error(err));
|
||||
}
|
||||
resolve(count);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private async _flush(): Promise<void> {
|
||||
await new Promise((resolve, reject) => {
|
||||
this._sender.flush((_count: number, err?: string) => {
|
||||
if (err) {
|
||||
return reject(new Error(err));
|
||||
}
|
||||
this._logger.debug('successful flush for %s spans', _count);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,9 +26,10 @@ export interface ExporterConfig {
|
|||
host?: string; // default: 'localhost'
|
||||
port?: number; // default: 6832
|
||||
maxPacketSize?: number; // default: 65000
|
||||
/** Force a flush on shutdown */
|
||||
forceFlush?: boolean; // default: true
|
||||
/** Time to wait for an onShutdown flush to finish before closing the sender */
|
||||
flushTimeout?: number; // default: 2000
|
||||
flushInterval?: number; // default(ms): 5000
|
||||
}
|
||||
|
||||
// Below require is needed as jaeger-client types does not expose the thrift,
|
||||
|
|
|
|||
|
|
@ -61,8 +61,8 @@ describe('JaegerExporter', () => {
|
|||
assert.ok(typeof exporter.export === 'function');
|
||||
assert.ok(typeof exporter.shutdown === 'function');
|
||||
|
||||
assert.ok(exporter['_forceFlush']);
|
||||
assert.strictEqual(exporter['_flushTimeout'], 5000);
|
||||
assert.ok(exporter['_forceFlushOnShutdown']);
|
||||
assert.strictEqual(exporter['_onShutdownFlushTimeout'], 5000);
|
||||
});
|
||||
|
||||
it('should construct an exporter without forceFlush and flushTimeout', () => {
|
||||
|
|
@ -72,8 +72,8 @@ describe('JaegerExporter', () => {
|
|||
assert.ok(typeof exporter.export === 'function');
|
||||
assert.ok(typeof exporter.shutdown === 'function');
|
||||
|
||||
assert.ok(exporter['_forceFlush']);
|
||||
assert.strictEqual(exporter['_flushTimeout'], 2000);
|
||||
assert.ok(exporter['_forceFlushOnShutdown']);
|
||||
assert.strictEqual(exporter['_onShutdownFlushTimeout'], 2000);
|
||||
});
|
||||
|
||||
it('should construct an exporter with forceFlush = false', () => {
|
||||
|
|
@ -84,7 +84,7 @@ describe('JaegerExporter', () => {
|
|||
assert.ok(typeof exporter.export === 'function');
|
||||
assert.ok(typeof exporter.shutdown === 'function');
|
||||
|
||||
assert.ok(!exporter['_forceFlush']);
|
||||
assert.ok(!exporter['_forceFlushOnShutdown']);
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ export class BatchSpanProcessor implements SpanProcessor {
|
|||
: DEFAULT_BUFFER_TIMEOUT_MS;
|
||||
|
||||
this._timer = setInterval(() => {
|
||||
if (Date.now() - this._lastSpanFlush >= this._bufferTimeout) {
|
||||
if (this._shouldFlush()) {
|
||||
this._flush();
|
||||
}
|
||||
}, this._bufferTimeout);
|
||||
|
|
@ -73,6 +73,13 @@ export class BatchSpanProcessor implements SpanProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
private _shouldFlush(): boolean {
|
||||
return (
|
||||
this._finishedSpans.length >= 0 &&
|
||||
Date.now() - this._lastSpanFlush >= this._bufferTimeout
|
||||
);
|
||||
}
|
||||
|
||||
/** Send the span data list to exporter */
|
||||
private _flush() {
|
||||
this._exporter.export(this._finishedSpans, () => {});
|
||||
|
|
|
|||
Loading…
Reference in New Issue