mirror of https://github.com/grpc/grpc-node.git
Merge pull request #1212 from murgatroid99/grpc-js_call_tracers
grpc-js: Add low-level call tracers
This commit is contained in:
commit
886f31404c
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "@grpc/grpc-js",
|
"name": "@grpc/grpc-js",
|
||||||
"version": "0.6.13",
|
"version": "0.6.14",
|
||||||
"description": "gRPC Library for Node - pure JS implementation",
|
"description": "gRPC Library for Node - pure JS implementation",
|
||||||
"homepage": "https://grpc.io/",
|
"homepage": "https://grpc.io/",
|
||||||
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
|
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
|
||||||
|
|
|
@ -28,6 +28,14 @@ import { ObjectDuplex, WriteCallback } from './object-stream';
|
||||||
import { StreamDecoder } from './stream-decoder';
|
import { StreamDecoder } from './stream-decoder';
|
||||||
import { ChannelImplementation } from './channel';
|
import { ChannelImplementation } from './channel';
|
||||||
import { Subchannel } from './subchannel';
|
import { Subchannel } from './subchannel';
|
||||||
|
import * as logging from './logging';
|
||||||
|
import { LogVerbosity } from './constants';
|
||||||
|
|
||||||
|
const TRACER_NAME = 'call_stream';
|
||||||
|
|
||||||
|
function trace(text: string): void {
|
||||||
|
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
|
||||||
|
}
|
||||||
|
|
||||||
const {
|
const {
|
||||||
HTTP2_HEADER_STATUS,
|
HTTP2_HEADER_STATUS,
|
||||||
|
@ -135,6 +143,16 @@ export class Http2CallStream extends Duplex implements Call {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// tslint:disable-next-line:no-any
|
||||||
|
push(chunk: any, encoding?: string): boolean {
|
||||||
|
trace(
|
||||||
|
this.methodName +
|
||||||
|
'pushing to reader message of length ' +
|
||||||
|
(chunk instanceof Buffer ? chunk.length : null)
|
||||||
|
);
|
||||||
|
return super.push(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* On first call, emits a 'status' event with the given StatusObject.
|
* On first call, emits a 'status' event with the given StatusObject.
|
||||||
* Subsequent calls are no-ops.
|
* Subsequent calls are no-ops.
|
||||||
|
@ -142,6 +160,14 @@ export class Http2CallStream extends Duplex implements Call {
|
||||||
*/
|
*/
|
||||||
private endCall(status: StatusObject): void {
|
private endCall(status: StatusObject): void {
|
||||||
if (this.finalStatus === null) {
|
if (this.finalStatus === null) {
|
||||||
|
trace(
|
||||||
|
this.methodName +
|
||||||
|
' ended with status: code=' +
|
||||||
|
status.code +
|
||||||
|
' details="' +
|
||||||
|
status.details +
|
||||||
|
'"'
|
||||||
|
);
|
||||||
this.finalStatus = status;
|
this.finalStatus = status;
|
||||||
/* We do this asynchronously to ensure that no async function is in the
|
/* We do this asynchronously to ensure that no async function is in the
|
||||||
* call stack when we return control to the application. If an async
|
* call stack when we return control to the application. If an async
|
||||||
|
@ -177,6 +203,11 @@ export class Http2CallStream extends Duplex implements Call {
|
||||||
(this.http2Stream as http2.ClientHttp2Stream).pause();
|
(this.http2Stream as http2.ClientHttp2Stream).pause();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
trace(
|
||||||
|
this.methodName +
|
||||||
|
' unpushedReadMessages.push message of length ' +
|
||||||
|
message.length
|
||||||
|
);
|
||||||
this.unpushedReadMessages.push(message);
|
this.unpushedReadMessages.push(message);
|
||||||
}
|
}
|
||||||
if (this.unfilteredReadMessages.length > 0) {
|
if (this.unfilteredReadMessages.length > 0) {
|
||||||
|
@ -201,6 +232,11 @@ export class Http2CallStream extends Duplex implements Call {
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
trace(
|
||||||
|
this.methodName +
|
||||||
|
' filterReceivedMessage of length ' +
|
||||||
|
framedMessage.length
|
||||||
|
);
|
||||||
this.isReadFilterPending = true;
|
this.isReadFilterPending = true;
|
||||||
this.filterStack
|
this.filterStack
|
||||||
.receiveMessage(Promise.resolve(framedMessage))
|
.receiveMessage(Promise.resolve(framedMessage))
|
||||||
|
@ -212,6 +248,11 @@ export class Http2CallStream extends Duplex implements Call {
|
||||||
|
|
||||||
private tryPush(messageBytes: Buffer | null): void {
|
private tryPush(messageBytes: Buffer | null): void {
|
||||||
if (this.isReadFilterPending) {
|
if (this.isReadFilterPending) {
|
||||||
|
trace(
|
||||||
|
this.methodName +
|
||||||
|
' unfilteredReadMessages.push message of length ' +
|
||||||
|
(messageBytes && messageBytes.length)
|
||||||
|
);
|
||||||
this.unfilteredReadMessages.push(messageBytes);
|
this.unfilteredReadMessages.push(messageBytes);
|
||||||
} else {
|
} else {
|
||||||
this.filterReceivedMessage(messageBytes);
|
this.filterReceivedMessage(messageBytes);
|
||||||
|
@ -219,6 +260,7 @@ export class Http2CallStream extends Duplex implements Call {
|
||||||
}
|
}
|
||||||
|
|
||||||
private handleTrailers(headers: http2.IncomingHttpHeaders) {
|
private handleTrailers(headers: http2.IncomingHttpHeaders) {
|
||||||
|
trace(this.methodName + ' received HTTP/2 trailing headers frame');
|
||||||
const code: Status = this.mappedStatusCode;
|
const code: Status = this.mappedStatusCode;
|
||||||
const details = '';
|
const details = '';
|
||||||
let metadata: Metadata;
|
let metadata: Metadata;
|
||||||
|
@ -261,11 +303,17 @@ export class Http2CallStream extends Duplex implements Call {
|
||||||
if (this.finalStatus !== null) {
|
if (this.finalStatus !== null) {
|
||||||
stream.close(NGHTTP2_CANCEL);
|
stream.close(NGHTTP2_CANCEL);
|
||||||
} else {
|
} else {
|
||||||
|
trace(
|
||||||
|
this.methodName +
|
||||||
|
' attachHttp2Stream from subchannel ' +
|
||||||
|
subchannel.getAddress()
|
||||||
|
);
|
||||||
this.http2Stream = stream;
|
this.http2Stream = stream;
|
||||||
this.subchannel = subchannel;
|
this.subchannel = subchannel;
|
||||||
subchannel.addDisconnectListener(this.disconnectListener);
|
subchannel.addDisconnectListener(this.disconnectListener);
|
||||||
subchannel.callRef();
|
subchannel.callRef();
|
||||||
stream.on('response', (headers, flags) => {
|
stream.on('response', (headers, flags) => {
|
||||||
|
trace(this.methodName + ' received HTTP/2 headers frame');
|
||||||
switch (headers[':status']) {
|
switch (headers[':status']) {
|
||||||
// TODO(murgatroid99): handle 100 and 101
|
// TODO(murgatroid99): handle 100 and 101
|
||||||
case 400:
|
case 400:
|
||||||
|
@ -321,16 +369,28 @@ export class Http2CallStream extends Duplex implements Call {
|
||||||
});
|
});
|
||||||
stream.on('trailers', this.handleTrailers.bind(this));
|
stream.on('trailers', this.handleTrailers.bind(this));
|
||||||
stream.on('data', (data: Buffer) => {
|
stream.on('data', (data: Buffer) => {
|
||||||
|
trace(
|
||||||
|
this.methodName +
|
||||||
|
' receive HTTP/2 data frame of length ' +
|
||||||
|
data.length
|
||||||
|
);
|
||||||
const messages = this.decoder.write(data);
|
const messages = this.decoder.write(data);
|
||||||
|
|
||||||
for (const message of messages) {
|
for (const message of messages) {
|
||||||
|
trace(
|
||||||
|
this.methodName + ' parsed message of length ' + message.length
|
||||||
|
);
|
||||||
this.tryPush(message);
|
this.tryPush(message);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
stream.on('end', () => {
|
stream.on('end', () => {
|
||||||
|
trace(this.methodName + ' received HTTP/2 end of data flag');
|
||||||
this.tryPush(null);
|
this.tryPush(null);
|
||||||
});
|
});
|
||||||
stream.on('close', async () => {
|
stream.on('close', async () => {
|
||||||
|
trace(
|
||||||
|
this.methodName + ' HTTP/2 stream closed with code ' + stream.rstCode
|
||||||
|
);
|
||||||
let code: Status;
|
let code: Status;
|
||||||
let details = '';
|
let details = '';
|
||||||
switch (stream.rstCode) {
|
switch (stream.rstCode) {
|
||||||
|
@ -383,6 +443,7 @@ export class Http2CallStream extends Duplex implements Call {
|
||||||
}
|
}
|
||||||
|
|
||||||
sendMetadata(metadata: Metadata): void {
|
sendMetadata(metadata: Metadata): void {
|
||||||
|
trace(this.methodName + ' Sending metadata');
|
||||||
this.channel._startCallStream(this, metadata);
|
this.channel._startCallStream(this, metadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -461,6 +522,11 @@ export class Http2CallStream extends Duplex implements Call {
|
||||||
}
|
}
|
||||||
|
|
||||||
_write(chunk: WriteObject, encoding: string, cb: WriteCallback) {
|
_write(chunk: WriteObject, encoding: string, cb: WriteCallback) {
|
||||||
|
trace(
|
||||||
|
this.methodName +
|
||||||
|
' write() called with message of length ' +
|
||||||
|
chunk.message.length
|
||||||
|
);
|
||||||
this.filterStack.sendMessage(Promise.resolve(chunk)).then(message => {
|
this.filterStack.sendMessage(Promise.resolve(chunk)).then(message => {
|
||||||
if (this.http2Stream === null) {
|
if (this.http2Stream === null) {
|
||||||
this.pendingWrite = message.message;
|
this.pendingWrite = message.message;
|
||||||
|
@ -472,6 +538,7 @@ export class Http2CallStream extends Duplex implements Call {
|
||||||
}
|
}
|
||||||
|
|
||||||
_final(cb: Function) {
|
_final(cb: Function) {
|
||||||
|
trace(this.methodName + ' end() called');
|
||||||
if (this.http2Stream === null) {
|
if (this.http2Stream === null) {
|
||||||
this.pendingFinalCallback = cb;
|
this.pendingFinalCallback = cb;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -357,6 +357,11 @@ export class ChannelImplementation implements Channel {
|
||||||
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
||||||
throw new Error('Channel has been shut down');
|
throw new Error('Channel has been shut down');
|
||||||
}
|
}
|
||||||
|
trace(
|
||||||
|
LogVerbosity.DEBUG,
|
||||||
|
'channel',
|
||||||
|
'createCall(method="' + method + '", deadline=' + deadline + ')'
|
||||||
|
);
|
||||||
const finalOptions: CallStreamOptions = {
|
const finalOptions: CallStreamOptions = {
|
||||||
deadline:
|
deadline:
|
||||||
deadline === null || deadline === undefined ? Infinity : deadline,
|
deadline === null || deadline === undefined ? Infinity : deadline,
|
||||||
|
|
|
@ -395,6 +395,13 @@ export class Subchannel {
|
||||||
}
|
}
|
||||||
|
|
||||||
callRef() {
|
callRef() {
|
||||||
|
trace(
|
||||||
|
this.subchannelAddress +
|
||||||
|
' callRefcount ' +
|
||||||
|
this.callRefcount +
|
||||||
|
' -> ' +
|
||||||
|
(this.callRefcount + 1)
|
||||||
|
);
|
||||||
if (this.callRefcount === 0) {
|
if (this.callRefcount === 0) {
|
||||||
if (this.session) {
|
if (this.session) {
|
||||||
this.session.ref();
|
this.session.ref();
|
||||||
|
@ -405,6 +412,13 @@ export class Subchannel {
|
||||||
}
|
}
|
||||||
|
|
||||||
callUnref() {
|
callUnref() {
|
||||||
|
trace(
|
||||||
|
this.subchannelAddress +
|
||||||
|
' callRefcount ' +
|
||||||
|
this.callRefcount +
|
||||||
|
' -> ' +
|
||||||
|
(this.callRefcount - 1)
|
||||||
|
);
|
||||||
this.callRefcount -= 1;
|
this.callRefcount -= 1;
|
||||||
if (this.callRefcount === 0) {
|
if (this.callRefcount === 0) {
|
||||||
if (this.session) {
|
if (this.session) {
|
||||||
|
@ -416,10 +430,24 @@ export class Subchannel {
|
||||||
}
|
}
|
||||||
|
|
||||||
ref() {
|
ref() {
|
||||||
|
trace(
|
||||||
|
this.subchannelAddress +
|
||||||
|
' callRefcount ' +
|
||||||
|
this.refcount +
|
||||||
|
' -> ' +
|
||||||
|
(this.refcount + 1)
|
||||||
|
);
|
||||||
this.refcount += 1;
|
this.refcount += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
unref() {
|
unref() {
|
||||||
|
trace(
|
||||||
|
this.subchannelAddress +
|
||||||
|
' callRefcount ' +
|
||||||
|
this.refcount +
|
||||||
|
' -> ' +
|
||||||
|
(this.refcount - 1)
|
||||||
|
);
|
||||||
this.refcount -= 1;
|
this.refcount -= 1;
|
||||||
this.checkBothRefcounts();
|
this.checkBothRefcounts();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue