feat(plugin): pg-pool plugin implementation (#501)

* feat: pg-pool plugin implementation

* feat: pg-pool plugin implementation

* fix: linting

* fix: add attributes for span & add tests for pool.query()

* fix: add span.setStatus

* chore: address comments

* fix: linting

Co-authored-by: Mayur Kale <mayurkale@google.com>
This commit is contained in:
Xiao 2019-12-23 12:02:34 -08:00 committed by Mayur Kale
parent 38ce1c3eac
commit 1a2d926e02
8 changed files with 613 additions and 0 deletions

View File

@ -0,0 +1,36 @@
/*!
* Copyright 2019, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export enum AttributeNames {
// required by https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/data-semantic-conventions.md#databases-client-calls
COMPONENT = 'component',
DB_TYPE = 'db.type',
DB_INSTANCE = 'db.instance',
DB_STATEMENT = 'db.statement',
PEER_ADDRESS = 'peer.address',
PEER_HOSTNAME = 'peer.host',
// optional
DB_USER = 'db.user',
PEER_PORT = 'peer.port',
PEER_IPV4 = 'peer.ipv4',
PEER_IPV6 = 'peer.ipv6',
PEER_SERVICE = 'peer.service',
// PG-POOL specific -- not specified by spec
IDLE_TIMEOUT_MILLIS = 'idle.timeout.millis',
MAX_CLIENT = 'max',
}

View File

@ -0,0 +1,17 @@
/*!
* Copyright 2019, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export * from './pg-pool';

View File

@ -13,3 +13,120 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { BasePlugin } from '@opentelemetry/core';
import { CanonicalCode, SpanKind } from '@opentelemetry/types';
import { AttributeNames } from './enums';
import * as shimmer from 'shimmer';
import * as pgPoolTypes from 'pg-pool';
import {
PostgresPoolPluginOptions,
PgPoolCallback,
PgPoolExtended,
} from './types';
import * as utils from './utils';
export class PostgresPoolPlugin extends BasePlugin<typeof pgPoolTypes> {
protected _config: PostgresPoolPluginOptions;
static readonly COMPONENT = 'pg-pool';
static readonly DB_TYPE = 'sql';
readonly supportedVersions = ['2.*'];
constructor(readonly moduleName: string) {
super();
this._config = {};
}
protected patch(): typeof pgPoolTypes {
shimmer.wrap(
this._moduleExports.prototype,
'connect',
this._getPoolConnectPatch() as never
);
return this._moduleExports;
}
protected unpatch(): void {
shimmer.unwrap(this._moduleExports.prototype, 'connect');
}
private _getPoolConnectPatch() {
const plugin = this;
return (originalConnect: typeof pgPoolTypes.prototype.connect) => {
plugin._logger.debug(
`Patching ${PostgresPoolPlugin.COMPONENT}.prototype.connect`
);
return function connect(this: PgPoolExtended, callback?: PgPoolCallback) {
const jdbcString = utils.getJDBCString(this.options);
// setup span
const span = plugin._tracer.startSpan(
`${PostgresPoolPlugin.COMPONENT}.connect`,
{
kind: SpanKind.CLIENT,
parent: plugin._tracer.getCurrentSpan() || undefined,
attributes: {
[AttributeNames.COMPONENT]: PostgresPoolPlugin.COMPONENT, // required
[AttributeNames.DB_TYPE]: PostgresPoolPlugin.DB_TYPE, // required
[AttributeNames.DB_INSTANCE]: this.options.database, // required
[AttributeNames.PEER_HOSTNAME]: this.options.host, // required
[AttributeNames.PEER_ADDRESS]: jdbcString, // required
[AttributeNames.PEER_PORT]: this.options.port,
[AttributeNames.DB_USER]: this.options.user,
[AttributeNames.IDLE_TIMEOUT_MILLIS]: this.options
.idleTimeoutMillis,
[AttributeNames.MAX_CLIENT]: this.options.maxClient,
},
}
);
if (callback) {
const parentSpan = plugin._tracer.getCurrentSpan();
callback = utils.patchCallback(span, callback) as PgPoolCallback;
// If a parent span exists, bind the callback
if (parentSpan) {
callback = plugin._tracer.bind(callback);
}
}
const connectResult: unknown = originalConnect.call(
this,
callback as never
);
// No callback was provided, return a promise instead
if (connectResult instanceof Promise) {
const connectResultPromise = connectResult as Promise<unknown>;
return plugin._tracer.bind(
connectResultPromise
.then((result: any) => {
// Resturn a pass-along promise which ends the span and then goes to user's orig resolvers
return new Promise((resolve, _) => {
span.setStatus({ code: CanonicalCode.OK });
span.end();
resolve(result);
});
})
.catch((error: Error) => {
return new Promise((_, reject) => {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: error.message,
});
span.end();
reject(error);
});
})
);
}
// Else a callback was provided, so just return the result
return connectResult;
};
};
}
}
export const plugin = new PostgresPoolPlugin(PostgresPoolPlugin.COMPONENT);

View File

@ -0,0 +1,39 @@
/*!
* Copyright 2019, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as pgTypes from 'pg';
import * as pgPoolTypes from 'pg-pool';
export interface PostgresPoolPluginOptions {}
export type PgPoolCallback = (
err: Error,
client: any,
done: (release?: any) => void
) => void;
export interface PgPoolOptionsParams {
database: string;
host: string;
port: number;
user: string;
idleTimeoutMillis: number; // the minimum amount of time that an object may sit idle in the pool before it is eligible for eviction due to idle time
maxClient: number; // maximum size of the pool
}
export interface PgPoolExtended extends pgPoolTypes<pgTypes.Client> {
options: PgPoolOptionsParams;
}

View File

@ -0,0 +1,45 @@
/*!
* Copyright 2019, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Span, CanonicalCode } from '@opentelemetry/types';
import { PgPoolOptionsParams, PgPoolCallback, PgPoolExtended } from './types';
export function getJDBCString(params: PgPoolOptionsParams) {
const host = params.host || 'localhost'; // postgres defaults to localhost
const port = params.port || 5432; // postgres defaults to port 5432
const database = params.database || '';
return `jdbc:postgresql://${host}:${port}/${database}`;
}
export function patchCallback(span: Span, cb: PgPoolCallback): PgPoolCallback {
return function patchedCallback(
this: PgPoolExtended,
err: Error,
res: object,
done: any
) {
if (err) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: err.message,
});
} else if (res) {
span.setStatus({ code: CanonicalCode.OK });
}
span.end();
cb.call(this, err, res, done);
};
}

View File

@ -0,0 +1,79 @@
/*!
* Copyright 2019, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
SpanKind,
Attributes,
Event,
Span,
TimedEvent,
} from '@opentelemetry/types';
import * as assert from 'assert';
import { ReadableSpan } from '@opentelemetry/tracing';
import {
hrTimeToMilliseconds,
hrTimeToMicroseconds,
} from '@opentelemetry/core';
export const assertSpan = (
span: ReadableSpan,
kind: SpanKind,
attributes: Attributes,
events: Event[]
) => {
assert.strictEqual(span.spanContext.traceId.length, 32);
assert.strictEqual(span.spanContext.spanId.length, 16);
assert.strictEqual(span.kind, kind);
// check all the AttributeNames fields
Object.keys(span.attributes).forEach(key => {
assert.deepStrictEqual(span.attributes[key], attributes[key]);
});
assert.ok(span.endTime);
assert.strictEqual(span.links.length, 0);
assert.ok(
hrTimeToMicroseconds(span.startTime) < hrTimeToMicroseconds(span.endTime)
);
assert.ok(hrTimeToMilliseconds(span.endTime) > 0);
// events
assert.strictEqual(
span.events.length,
events.length,
'Should contain same number of events'
);
span.events.forEach((_: TimedEvent, index: number) => {
assert.deepStrictEqual(span.events[index], events[index]);
});
};
// Check if sourceSpan was propagated to targetSpan
export const assertPropagation = (
childSpan: ReadableSpan,
parentSpan: Span
) => {
const targetSpanContext = childSpan.spanContext;
const sourceSpanContext = parentSpan.context();
assert.strictEqual(targetSpanContext.traceId, sourceSpanContext.traceId);
assert.strictEqual(childSpan.parentSpanId, sourceSpanContext.spanId);
assert.strictEqual(
targetSpanContext.traceFlags,
sourceSpanContext.traceFlags
);
assert.notStrictEqual(targetSpanContext.spanId, sourceSpanContext.spanId);
};

View File

@ -13,3 +13,229 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { NoopLogger } from '@opentelemetry/core';
import { NodeTracer } from '@opentelemetry/node';
import {
InMemorySpanExporter,
SimpleSpanProcessor,
} from '@opentelemetry/tracing';
import { SpanKind, Attributes, TimedEvent, Span } from '@opentelemetry/types';
import { plugin as pgPlugin, PostgresPlugin } from '@opentelemetry/plugin-pg';
import { plugin, PostgresPoolPlugin } from '../src';
import { AttributeNames } from '../src/enums';
import * as assert from 'assert';
import * as pg from 'pg';
import * as pgPool from 'pg-pool';
import * as assertionUtils from './assertionUtils';
import * as testUtils from './testUtils';
const memoryExporter = new InMemorySpanExporter();
const CONFIG = {
user: process.env.POSTGRES_USER || 'postgres',
database: process.env.POSTGRES_DB || 'postgres',
host: process.env.POSTGRES_HOST || 'localhost',
port: process.env.POSTGRES_PORT
? parseInt(process.env.POSTGRES_PORT, 10)
: 54320,
maxClient: 1,
idleTimeoutMillis: 10000,
};
const DEFAULT_PGPOOL_ATTRIBUTES = {
[AttributeNames.COMPONENT]: PostgresPoolPlugin.COMPONENT,
[AttributeNames.DB_INSTANCE]: CONFIG.database,
[AttributeNames.DB_TYPE]: PostgresPoolPlugin.DB_TYPE,
[AttributeNames.PEER_HOSTNAME]: CONFIG.host,
[AttributeNames.PEER_ADDRESS]: `jdbc:postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`,
[AttributeNames.PEER_PORT]: CONFIG.port,
[AttributeNames.DB_USER]: CONFIG.user,
[AttributeNames.MAX_CLIENT]: CONFIG.maxClient,
[AttributeNames.IDLE_TIMEOUT_MILLIS]: CONFIG.idleTimeoutMillis,
};
const DEFAULT_PG_ATTRIBUTES = {
[AttributeNames.COMPONENT]: PostgresPlugin.COMPONENT,
[AttributeNames.DB_INSTANCE]: CONFIG.database,
[AttributeNames.DB_TYPE]: PostgresPlugin.DB_TYPE,
[AttributeNames.PEER_HOSTNAME]: CONFIG.host,
[AttributeNames.PEER_ADDRESS]: `jdbc:postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`,
[AttributeNames.PEER_PORT]: CONFIG.port,
[AttributeNames.DB_USER]: CONFIG.user,
};
const runCallbackTest = (
parentSpan: Span,
attributes: Attributes,
events: TimedEvent[],
spansLength = 1,
spansIndex = 0
) => {
const spans = memoryExporter.getFinishedSpans();
assert.strictEqual(spans.length, spansLength);
const pgSpan = spans[spansIndex];
assertionUtils.assertSpan(pgSpan, SpanKind.CLIENT, attributes, events);
assertionUtils.assertPropagation(pgSpan, parentSpan);
};
describe('pg-pool@2.x', () => {
let pool: pgPool<pg.Client>;
const tracer = new NodeTracer();
const logger = new NoopLogger();
const testPostgres = process.env.TEST_POSTGRES; // For CI: assumes local postgres db is already available
const testPostgresLocally = process.env.TEST_POSTGRES_LOCAL; // For local: spins up local postgres db via docker
const shouldTest = testPostgres || testPostgresLocally; // Skips these tests if false (default)
before(function(done) {
if (!shouldTest) {
// this.skip() workaround
// https://github.com/mochajs/mocha/issues/2683#issuecomment-375629901
this.test!.parent!.pending = true;
this.skip();
}
pool = new pgPool(CONFIG);
tracer.addSpanProcessor(new SimpleSpanProcessor(memoryExporter));
if (testPostgresLocally) {
testUtils.startDocker();
}
done();
});
after(function(done) {
if (testPostgresLocally) {
testUtils.cleanUpDocker();
}
pool.end(() => {
done();
});
});
beforeEach(function() {
plugin.enable(pgPool, tracer, logger);
pgPlugin.enable(pg, tracer, logger);
});
afterEach(() => {
memoryExporter.reset();
plugin.disable();
pgPlugin.disable();
});
it('should return a plugin', () => {
assert.ok(plugin instanceof PostgresPoolPlugin);
});
it('should have correct moduleName', () => {
assert.strictEqual(plugin.moduleName, 'pg-pool');
});
describe('#pool.connect()', () => {
// promise - checkout a client
it('should intercept pool.connect()', async () => {
const pgPoolattributes = {
...DEFAULT_PGPOOL_ATTRIBUTES,
};
const pgAttributes = {
...DEFAULT_PG_ATTRIBUTES,
[AttributeNames.DB_STATEMENT]: 'SELECT NOW()',
};
const events: TimedEvent[] = [];
const span = tracer.startSpan('test span');
await tracer.withSpan(span, async () => {
const client = await pool.connect();
runCallbackTest(span, pgPoolattributes, events, 1, 0);
assert.ok(client, 'pool.connect() returns a promise');
try {
await client.query('SELECT NOW()');
runCallbackTest(span, pgAttributes, events, 2, 1);
} catch (e) {
throw e;
} finally {
client.release();
}
});
});
// callback - checkout a client
it('should not return a promise if callback is provided', done => {
const pgPoolattributes = {
...DEFAULT_PGPOOL_ATTRIBUTES,
};
const pgAttributes = {
...DEFAULT_PG_ATTRIBUTES,
[AttributeNames.DB_STATEMENT]: 'SELECT NOW()',
};
const events: TimedEvent[] = [];
const parentSpan = tracer.startSpan('test span');
tracer.withSpan(parentSpan, () => {
const resNoPromise = pool.connect((err, client, release) => {
if (err) {
return done(err);
}
release();
assert.ok(client);
runCallbackTest(parentSpan, pgPoolattributes, events, 1, 0);
client.query('SELECT NOW()', (err, ret) => {
if (err) {
return done(err);
}
assert.ok(ret);
runCallbackTest(parentSpan, pgAttributes, events, 2, 1);
done();
});
});
assert.strictEqual(resNoPromise, undefined, 'No promise is returned');
});
});
});
describe('#pool.query()', () => {
// promise
it('should call patched client.query()', async () => {
const pgPoolattributes = {
...DEFAULT_PGPOOL_ATTRIBUTES,
};
const pgAttributes = {
...DEFAULT_PG_ATTRIBUTES,
[AttributeNames.DB_STATEMENT]: 'SELECT NOW()',
};
const events: TimedEvent[] = [];
const span = tracer.startSpan('test span');
await tracer.withSpan(span, async () => {
try {
const result = await pool.query('SELECT NOW()');
runCallbackTest(span, pgPoolattributes, events, 2, 0);
runCallbackTest(span, pgAttributes, events, 2, 1);
assert.ok(result, 'pool.query() returns a promise');
} catch (e) {
throw e;
}
});
});
// callback
it('should not return a promise if callback is provided', done => {
const pgPoolattributes = {
...DEFAULT_PGPOOL_ATTRIBUTES,
};
const pgAttributes = {
...DEFAULT_PG_ATTRIBUTES,
[AttributeNames.DB_STATEMENT]: 'SELECT NOW()',
};
const events: TimedEvent[] = [];
const parentSpan = tracer.startSpan('test span');
tracer.withSpan(parentSpan, () => {
const resNoPromise = pool.query('SELECT NOW()', (err, result) => {
if (err) {
return done(err);
}
runCallbackTest(parentSpan, pgPoolattributes, events, 2, 0);
runCallbackTest(parentSpan, pgAttributes, events, 2, 1);
done();
});
assert.strictEqual(resNoPromise, undefined, 'No promise is returned');
});
});
});
});

View File

@ -0,0 +1,54 @@
/*!
* Copyright 2019, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as childProcess from 'child_process';
export function startDocker() {
const tasks = [
run('docker run -d -p 54320:5432 --name otpostgres postgres:alpine'),
];
for (let i = 0; i < tasks.length; i++) {
const task = tasks[i];
if (task && task.code !== 0) {
console.error('Failed to start container!');
console.error(task.output);
return false;
}
}
return true;
}
export function cleanUpDocker() {
run('docker stop otpostgres');
run('docker rm otpostgres');
}
function run(cmd: string) {
try {
const proc = childProcess.spawnSync(cmd, {
shell: true,
});
return {
code: proc.status,
output: proc.output
.map(v => String.fromCharCode.apply(null, v as any))
.join(''),
};
} catch (e) {
console.log(e);
return;
}
}