grpc-js: Add unique ID number to call trace logs

This commit is contained in:
murgatroid99 2019-12-13 14:12:05 -08:00
parent 60a2b4fb2a
commit 992fd21a51
2 changed files with 55 additions and 50 deletions

View File

@ -33,10 +33,6 @@ import { LogVerbosity } from './constants';
const TRACER_NAME = 'call_stream'; const TRACER_NAME = 'call_stream';
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
const { const {
HTTP2_HEADER_STATUS, HTTP2_HEADER_STATUS,
HTTP2_HEADER_CONTENT_TYPE, HTTP2_HEADER_CONTENT_TYPE,
@ -129,7 +125,8 @@ export class Http2CallStream extends Duplex implements Call {
private readonly channel: ChannelImplementation, private readonly channel: ChannelImplementation,
private readonly options: CallStreamOptions, private readonly options: CallStreamOptions,
filterStackFactory: FilterStackFactory, filterStackFactory: FilterStackFactory,
private readonly channelCallCredentials: CallCredentials private readonly channelCallCredentials: CallCredentials,
private readonly callNumber: number
) { ) {
super({ objectMode: true }); super({ objectMode: true });
this.filterStack = filterStackFactory.createFilter(this); this.filterStack = filterStackFactory.createFilter(this);
@ -143,11 +140,18 @@ export class Http2CallStream extends Duplex implements Call {
}; };
} }
private trace(text: string): void {
logging.trace(
LogVerbosity.DEBUG,
TRACER_NAME,
'[' + this.callNumber + '] ' + text
);
}
// tslint:disable-next-line:no-any // tslint:disable-next-line:no-any
push(chunk: any, encoding?: string): boolean { push(chunk: any, encoding?: string): boolean {
trace( this.trace(
this.methodName + 'pushing to reader message of length ' +
'pushing to reader message of length ' +
(chunk instanceof Buffer ? chunk.length : null) (chunk instanceof Buffer ? chunk.length : null)
); );
return super.push(chunk); return super.push(chunk);
@ -160,9 +164,8 @@ export class Http2CallStream extends Duplex implements Call {
*/ */
private endCall(status: StatusObject): void { private endCall(status: StatusObject): void {
if (this.finalStatus === null) { if (this.finalStatus === null) {
trace( this.trace(
this.methodName + 'ended with status: code=' +
' ended with status: code=' +
status.code + status.code +
' details="' + ' details="' +
status.details + status.details +
@ -203,10 +206,8 @@ export class Http2CallStream extends Duplex implements Call {
(this.http2Stream as http2.ClientHttp2Stream).pause(); (this.http2Stream as http2.ClientHttp2Stream).pause();
} }
} else { } else {
trace( this.trace(
this.methodName + 'unpushedReadMessages.push message of length ' + message.length
' unpushedReadMessages.push message of length ' +
message.length
); );
this.unpushedReadMessages.push(message); this.unpushedReadMessages.push(message);
} }
@ -232,11 +233,7 @@ export class Http2CallStream extends Duplex implements Call {
} }
return; return;
} }
trace( this.trace('filterReceivedMessage of length ' + framedMessage.length);
this.methodName +
' filterReceivedMessage of length ' +
framedMessage.length
);
this.isReadFilterPending = true; this.isReadFilterPending = true;
this.filterStack this.filterStack
.receiveMessage(Promise.resolve(framedMessage)) .receiveMessage(Promise.resolve(framedMessage))
@ -248,9 +245,10 @@ export class Http2CallStream extends Duplex implements Call {
private tryPush(messageBytes: Buffer | null): void { private tryPush(messageBytes: Buffer | null): void {
if (this.isReadFilterPending) { if (this.isReadFilterPending) {
trace( this.trace(
this.methodName + '[' +
' unfilteredReadMessages.push message of length ' + this.callNumber +
'] unfilteredReadMessages.push message of length ' +
(messageBytes && messageBytes.length) (messageBytes && messageBytes.length)
); );
this.unfilteredReadMessages.push(messageBytes); this.unfilteredReadMessages.push(messageBytes);
@ -260,7 +258,7 @@ export class Http2CallStream extends Duplex implements Call {
} }
private handleTrailers(headers: http2.IncomingHttpHeaders) { private handleTrailers(headers: http2.IncomingHttpHeaders) {
trace(this.methodName + ' received HTTP/2 trailing headers frame'); this.trace('received HTTP/2 trailing headers frame');
const code: Status = this.mappedStatusCode; const code: Status = this.mappedStatusCode;
const details = ''; const details = '';
let metadata: Metadata; let metadata: Metadata;
@ -303,17 +301,15 @@ export class Http2CallStream extends Duplex implements Call {
if (this.finalStatus !== null) { if (this.finalStatus !== null) {
stream.close(NGHTTP2_CANCEL); stream.close(NGHTTP2_CANCEL);
} else { } else {
trace( this.trace(
this.methodName + 'attachHttp2Stream from subchannel ' + subchannel.getAddress()
' attachHttp2Stream from subchannel ' +
subchannel.getAddress()
); );
this.http2Stream = stream; this.http2Stream = stream;
this.subchannel = subchannel; this.subchannel = subchannel;
subchannel.addDisconnectListener(this.disconnectListener); subchannel.addDisconnectListener(this.disconnectListener);
subchannel.callRef(); subchannel.callRef();
stream.on('response', (headers, flags) => { stream.on('response', (headers, flags) => {
trace(this.methodName + ' received HTTP/2 headers frame'); this.trace('received HTTP/2 headers frame');
switch (headers[':status']) { switch (headers[':status']) {
// TODO(murgatroid99): handle 100 and 101 // TODO(murgatroid99): handle 100 and 101
case 400: case 400:
@ -369,28 +365,20 @@ export class Http2CallStream extends Duplex implements Call {
}); });
stream.on('trailers', this.handleTrailers.bind(this)); stream.on('trailers', this.handleTrailers.bind(this));
stream.on('data', (data: Buffer) => { stream.on('data', (data: Buffer) => {
trace( this.trace('receive HTTP/2 data frame of length ' + data.length);
this.methodName +
' receive HTTP/2 data frame of length ' +
data.length
);
const messages = this.decoder.write(data); const messages = this.decoder.write(data);
for (const message of messages) { for (const message of messages) {
trace( this.trace('parsed message of length ' + message.length);
this.methodName + ' parsed message of length ' + message.length
);
this.tryPush(message); this.tryPush(message);
} }
}); });
stream.on('end', () => { stream.on('end', () => {
trace(this.methodName + ' received HTTP/2 end of data flag'); this.trace('received HTTP/2 end of data flag');
this.tryPush(null); this.tryPush(null);
}); });
stream.on('close', async () => { stream.on('close', async () => {
trace( this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
this.methodName + ' HTTP/2 stream closed with code ' + stream.rstCode
);
let code: Status; let code: Status;
let details = ''; let details = '';
switch (stream.rstCode) { switch (stream.rstCode) {
@ -437,13 +425,14 @@ export class Http2CallStream extends Duplex implements Call {
stream.write(this.pendingWrite, this.pendingWriteCallback); stream.write(this.pendingWrite, this.pendingWriteCallback);
} }
if (this.pendingFinalCallback) { if (this.pendingFinalCallback) {
this.trace('calling end() on HTTP/2 stream');
stream.end(this.pendingFinalCallback); stream.end(this.pendingFinalCallback);
} }
} }
} }
sendMetadata(metadata: Metadata): void { sendMetadata(metadata: Metadata): void {
trace(this.methodName + ' Sending metadata'); this.trace('Sending metadata');
this.channel._startCallStream(this, metadata); this.channel._startCallStream(this, metadata);
} }
@ -522,11 +511,7 @@ export class Http2CallStream extends Duplex implements Call {
} }
_write(chunk: WriteObject, encoding: string, cb: WriteCallback) { _write(chunk: WriteObject, encoding: string, cb: WriteCallback) {
trace( this.trace('write() called with message of length ' + chunk.message.length);
this.methodName +
' write() called with message of length ' +
chunk.message.length
);
this.filterStack.sendMessage(Promise.resolve(chunk)).then(message => { this.filterStack.sendMessage(Promise.resolve(chunk)).then(message => {
if (this.http2Stream === null) { if (this.http2Stream === null) {
this.pendingWrite = message.message; this.pendingWrite = message.message;
@ -538,10 +523,11 @@ export class Http2CallStream extends Duplex implements Call {
} }
_final(cb: Function) { _final(cb: Function) {
trace(this.methodName + ' end() called'); this.trace('end() called');
if (this.http2Stream === null) { if (this.http2Stream === null) {
this.pendingFinalCallback = cb; this.pendingFinalCallback = cb;
} else { } else {
this.trace('calling end() on HTTP/2 stream');
this.http2Stream.end(cb); this.http2Stream.end(cb);
} }
} }

View File

@ -47,6 +47,17 @@ export enum ConnectivityState {
SHUTDOWN, SHUTDOWN,
} }
let nextCallNumber = 0;
function getNewCallNumber(): number {
const callNumber = nextCallNumber;
nextCallNumber += 1;
if (nextCallNumber >= Number.MAX_SAFE_INTEGER) {
nextCallNumber = 0;
}
return callNumber;
}
/** /**
* An interface that represents a communication channel to a server specified * An interface that represents a communication channel to a server specified
* by a given address. * by a given address.
@ -357,10 +368,17 @@ export class ChannelImplementation implements Channel {
if (this.connectivityState === ConnectivityState.SHUTDOWN) { if (this.connectivityState === ConnectivityState.SHUTDOWN) {
throw new Error('Channel has been shut down'); throw new Error('Channel has been shut down');
} }
const callNumber = getNewCallNumber();
trace( trace(
LogVerbosity.DEBUG, LogVerbosity.DEBUG,
'channel', 'channel',
'createCall(method="' + method + '", deadline=' + deadline + ')' this.target +
' createCall [' +
callNumber +
'] method="' +
method +
'", deadline=' +
deadline
); );
const finalOptions: CallStreamOptions = { const finalOptions: CallStreamOptions = {
deadline: deadline:
@ -374,7 +392,8 @@ export class ChannelImplementation implements Channel {
this, this,
finalOptions, finalOptions,
this.filterStackFactory, this.filterStackFactory,
this.credentials._getCallCredentials() this.credentials._getCallCredentials(),
callNumber
); );
return stream; return stream;
} }