From ad598ecbe494680cf5db170406f5862df54f848e Mon Sep 17 00:00:00 2001 From: David Fiala Date: Tue, 28 May 2024 14:53:46 -0700 Subject: [PATCH 01/25] Serverside keepalive error detection and cleanups - Bugfix: Ensure that if session.ping returns false we correctly identify fail the keepalive and connection - Bugfix: Ensure that if the interval between keepalives being sent occurs faster than the prior keepalive's timeout that we do not overwrite the reference to the prior timeout. Prior implementation could have in theory prevented a valid keepalive timeout from clearing itself. This rewrite keeps every timeout as a local (vs a shared state per session). Even if the timeout outlives the lifetime of a session, we still guard against errors by checking that the parent interval is not false-y. I reckon this could result in a short-term memory leak per session which is bounded for a maximum of keepaliveTimeoutMs. On the other hand even with that potential for a short reference hold, this implementation proposed here is more correct I think. One alternative we could do is keep a list of pending timeouts.. which is complex for a rare situation that will self resolve anyhow when keepaliveTimeoutMs is reached. - Bug Fix: keepalive intervals were being cleared with an incorrect clearTimeout before. Not sure if this was causing intervals leaks in some nodejs impls or not. (v20.13.1 seems to accept this mismatch without issue) - Rename variables for clarity, to prevent future bugs like swapping clearInterval vs clearTimeout. - Implementation is repeated in two places, per warning from https://github.com/grpc/grpc-node/pull/2756#issuecomment-2136031256 - This commit supercedes the prior PR on a master branch which was out of date. https://github.com/grpc/grpc-node/pull/2756 --- packages/grpc-js/src/server.ts | 205 +++++++++++++++++++++------------ 1 file changed, 134 insertions(+), 71 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index feb511b4..c4d87af3 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1376,8 +1376,7 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; - let keeapliveTimeTimer: NodeJS.Timeout | null = null; - let keepaliveTimeoutTimer: NodeJS.Timeout | null = null; + let keepaliveInterval: NodeJS.Timeout | null = null; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1421,41 +1420,74 @@ export class Server { } if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { - keeapliveTimeTimer = setInterval(() => { - keepaliveTimeoutTimer = setTimeout(() => { - sessionClosedByServer = true; - session.close(); + keepaliveInterval = setInterval(() => { + // NOTE to self: document in PR that prior implementation would overwrite the prior pending timeout + // if the timeout had not occurred before the prior interval had elapsed (bad bug) + const keepaliveTimeout = setTimeout(() => { + if (keepaliveInterval) { + clearInterval(keepaliveInterval); + keepaliveInterval = null; + sessionClosedByServer = true; + this.trace('Connection dropped by keepalive timeout'); + session.close(); + } }, this.keepaliveTimeoutMs); - keepaliveTimeoutTimer.unref?.(); + keepaliveTimeout.unref?.(); try { - session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - if (keepaliveTimeoutTimer) { - clearTimeout(keepaliveTimeoutTimer); + if ( + !session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearTimeout(keepaliveTimeout); + if (err) { + if (keepaliveInterval) { + clearInterval(keepaliveInterval); + keepaliveInterval = null; + } + sessionClosedByServer = true; + this.trace( + 'Connection dropped due to error with ping frame ' + + err.message + + ' return in ' + + duration + ); + session.close(); + } } - - if (err) { - sessionClosedByServer = true; - this.trace( - 'Connection dropped due to error of a ping frame ' + - err.message + - ' return in ' + - duration - ); - session.close(); - } - } - ); + ) + ) { + throw new Error('Server keepalive ping send failed'); + } } catch (e) { - clearTimeout(keepaliveTimeoutTimer); - // The ping can't be sent because the session is already closed + // The ping can't be sent because the session is already closed, max outstanding pings reached, etc + clearTimeout(keepaliveTimeout); + if (keepaliveInterval) { + clearInterval(keepaliveInterval); + keepaliveInterval = null; + } + this.trace( + 'Connection dropped due to error sending ping frame ' + + (e instanceof Error ? e.message : 'unknown error') + ); session.destroy(); } }, this.keepaliveTimeMs); - keeapliveTimeTimer.unref?.(); + keepaliveInterval.unref?.(); } + session.once('goaway', (errorCode, lastStreamID, opaqueData) => { + if (errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM) { + this.trace('Connection dropped by client due to ENHANCE_YOUR_CALM'); + } else { + this.trace( + 'Connection dropped by client via GOAWAY with error code ' + + errorCode + ); + } + sessionClosedByServer = true; + session.destroy(); + }); + session.on('close', () => { if (!sessionClosedByServer) { this.trace( @@ -1471,11 +1503,9 @@ export class Server { clearTimeout(connectionAgeGraceTimer); } - if (keeapliveTimeTimer) { - clearInterval(keeapliveTimeTimer); - if (keepaliveTimeoutTimer) { - clearTimeout(keepaliveTimeoutTimer); - } + if (keepaliveInterval) { + clearInterval(keepaliveInterval); + keepaliveInterval = null; } if (idleTimeoutObj !== null) { @@ -1521,8 +1551,7 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; - let keeapliveTimeTimer: NodeJS.Timeout | null = null; - let keepaliveTimeoutTimer: NodeJS.Timeout | null = null; + let keepaliveInterval: NodeJS.Timeout | null = null; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1565,49 +1594,85 @@ export class Server { } if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { - keeapliveTimeTimer = setInterval(() => { - keepaliveTimeoutTimer = setTimeout(() => { - sessionClosedByServer = true; - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped by keepalive timeout from ' + clientAddress - ); - - session.close(); + keepaliveInterval = setInterval(() => { + const keepaliveTimeout = setTimeout(() => { + if (keepaliveInterval) { + clearInterval(keepaliveInterval); + keepaliveInterval = null; + sessionClosedByServer = true; + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by keepalive timeout from ' + clientAddress + ); + session.close(); + } }, this.keepaliveTimeoutMs); - keepaliveTimeoutTimer.unref?.(); + keepaliveTimeout.unref?.(); try { - session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - if (keepaliveTimeoutTimer) { - clearTimeout(keepaliveTimeoutTimer); + if ( + !session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearTimeout(keepaliveTimeout); + if (err) { + if (keepaliveInterval) { + clearInterval(keepaliveInterval); + keepaliveInterval = null; + } + sessionClosedByServer = true; + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped due to error with ping frame ' + + err.message + + ' return in ' + + duration + ); + session.close(); + } } - - if (err) { - sessionClosedByServer = true; - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped due to error of a ping frame ' + - err.message + - ' return in ' + - duration - ); - - session.close(); - } - } - ); + ) + ) { + throw new Error('Server keepalive ping send failed'); + } channelzSessionInfo.keepAlivesSent += 1; } catch (e) { - clearTimeout(keepaliveTimeoutTimer); - // The ping can't be sent because the session is already closed + // The ping can't be sent because the session is already closed, max outstanding pings reached, etc + clearTimeout(keepaliveTimeout); + if (keepaliveInterval) { + clearInterval(keepaliveInterval); + keepaliveInterval = null; + } + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped due to error sending ping frame ' + + (e instanceof Error ? e.message : 'unknown error') + ); session.destroy(); } }, this.keepaliveTimeMs); - keeapliveTimeTimer.unref?.(); + keepaliveInterval.unref?.(); } + session.once('goaway', (errorCode, lastStreamID, opaqueData) => { + if (errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM) { + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by client due GOAWAY of ENHANCE_YOUR_CALM from ' + + clientAddress + ); + } else { + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by client via GOAWAY with error code ' + + errorCode + + ' from ' + + clientAddress + ); + } + sessionClosedByServer = true; + session.destroy(); + }); + session.on('close', () => { if (!sessionClosedByServer) { this.channelzTrace.addTrace( @@ -1627,11 +1692,9 @@ export class Server { clearTimeout(connectionAgeGraceTimer); } - if (keeapliveTimeTimer) { - clearInterval(keeapliveTimeTimer); - if (keepaliveTimeoutTimer) { - clearTimeout(keepaliveTimeoutTimer); - } + if (keepaliveInterval) { + clearInterval(keepaliveInterval); + keepaliveInterval = null; } if (idleTimeoutObj !== null) { From 334f0dcdb5ecec9c7ece114f6d5cf8c7b0ca7ebc Mon Sep 17 00:00:00 2001 From: David Fiala Date: Tue, 28 May 2024 14:58:59 -0700 Subject: [PATCH 02/25] remove comment --- packages/grpc-js/src/server.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index c4d87af3..aed65bbb 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1421,8 +1421,6 @@ export class Server { if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { keepaliveInterval = setInterval(() => { - // NOTE to self: document in PR that prior implementation would overwrite the prior pending timeout - // if the timeout had not occurred before the prior interval had elapsed (bad bug) const keepaliveTimeout = setTimeout(() => { if (keepaliveInterval) { clearInterval(keepaliveInterval); From d799a7a5bdd372cabe5e3fdadbaf728448c9045b Mon Sep 17 00:00:00 2001 From: David Fiala Date: Tue, 28 May 2024 22:26:25 -0700 Subject: [PATCH 03/25] unify server and client keepalive matching comments and discussion on first round of review from https://github.com/grpc/grpc-node/pull/2760 --- packages/grpc-js/src/server.ts | 298 ++++++++++++++++-------------- packages/grpc-js/src/transport.ts | 105 +++++------ 2 files changed, 204 insertions(+), 199 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index aed65bbb..ae3ae29c 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -435,6 +435,14 @@ export class Server { ); } + private keepaliveTrace(text: string): void { + logging.trace( + LogVerbosity.DEBUG, + 'keepalive', + '(' + this.channelzRef.id + ') ' + text + ); + } + addProtoService(): never { throw new Error('Not implemented. Use addService() instead'); } @@ -1376,7 +1384,8 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; - let keepaliveInterval: NodeJS.Timeout | null = null; + let keepaliveTimeout: NodeJS.Timeout | null = null; + let keepaliveDisabled = false; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1419,72 +1428,73 @@ export class Server { connectionAgeTimer.unref?.(); } - if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { - keepaliveInterval = setInterval(() => { - const keepaliveTimeout = setTimeout(() => { - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - sessionClosedByServer = true; - this.trace('Connection dropped by keepalive timeout'); - session.close(); - } - }, this.keepaliveTimeoutMs); - keepaliveTimeout.unref?.(); - - try { - if ( - !session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - clearTimeout(keepaliveTimeout); - if (err) { - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - } - sessionClosedByServer = true; - this.trace( - 'Connection dropped due to error with ping frame ' + - err.message + - ' return in ' + - duration - ); - session.close(); - } - } - ) - ) { - throw new Error('Server keepalive ping send failed'); - } - } catch (e) { - // The ping can't be sent because the session is already closed, max outstanding pings reached, etc - clearTimeout(keepaliveTimeout); - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - } - this.trace( - 'Connection dropped due to error sending ping frame ' + - (e instanceof Error ? e.message : 'unknown error') - ); - session.destroy(); - } - }, this.keepaliveTimeMs); - keepaliveInterval.unref?.(); - } - - session.once('goaway', (errorCode, lastStreamID, opaqueData) => { - if (errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM) { - this.trace('Connection dropped by client due to ENHANCE_YOUR_CALM'); - } else { - this.trace( - 'Connection dropped by client via GOAWAY with error code ' + - errorCode - ); + const clearKeepaliveTimeout = () => { + if (keepaliveTimeout) { + clearTimeout(keepaliveTimeout); + keepaliveTimeout = null; } - sessionClosedByServer = true; - session.destroy(); - }); + }; + + const canSendPing = () => { + return ( + !keepaliveDisabled && + this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS && + this.keepaliveTimeMs > 0 + ); + }; + + const maybeStartKeepalivePingTimer = () => { + if (!canSendPing()) { + return; + } + this.keepaliveTrace( + 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' + ); + keepaliveTimeout = setTimeout(() => { + clearKeepaliveTimeout(); + sendPing(); + }, this.keepaliveTimeMs); + keepaliveTimeout.unref?.(); + }; + + const sendPing = () => { + if (!canSendPing()) { + return; + } + this.keepaliveTrace( + 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' + ); + const pingSentSuccessfully = session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearKeepaliveTimeout(); + if (err) { + this.keepaliveTrace('Ping failed with error: ' + err.message); + sessionClosedByServer = true; + session.close(); + } else { + this.keepaliveTrace('Received ping response'); + maybeStartKeepalivePingTimer(); + } + } + ); + + if (!pingSentSuccessfully) { + this.keepaliveTrace('Ping failed to send'); + sessionClosedByServer = true; + session.close(); + return; + } + + keepaliveTimeout = setTimeout(() => { + clearKeepaliveTimeout(); + this.keepaliveTrace('Ping timeout passed without response'); + sessionClosedByServer = true; + session.close(); + }, this.keepaliveTimeoutMs); + keepaliveTimeout.unref?.(); + }; + + maybeStartKeepalivePingTimer(); session.on('close', () => { if (!sessionClosedByServer) { @@ -1501,10 +1511,8 @@ export class Server { clearTimeout(connectionAgeGraceTimer); } - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - } + keepaliveDisabled = true; + clearKeepaliveTimeout(); if (idleTimeoutObj !== null) { clearTimeout(idleTimeoutObj.timeout); @@ -1549,7 +1557,8 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; - let keepaliveInterval: NodeJS.Timeout | null = null; + let keepaliveTimeout: NodeJS.Timeout | null = null; + let keepaliveDisabled = false; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1591,85 +1600,90 @@ export class Server { connectionAgeTimer.unref?.(); } - if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { - keepaliveInterval = setInterval(() => { - const keepaliveTimeout = setTimeout(() => { - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - sessionClosedByServer = true; + const clearKeepaliveTimeout = () => { + if (keepaliveTimeout) { + clearTimeout(keepaliveTimeout); + keepaliveTimeout = null; + } + }; + + const canSendPing = () => { + return ( + !keepaliveDisabled && + this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS && + this.keepaliveTimeMs > 0 + ); + }; + + const maybeStartKeepalivePingTimer = () => { + if (!canSendPing()) { + return; + } + this.keepaliveTrace( + 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' + ); + keepaliveTimeout = setTimeout(() => { + clearKeepaliveTimeout(); + sendPing(); + }, this.keepaliveTimeMs); + keepaliveTimeout.unref?.(); + }; + + const sendPing = () => { + if (!canSendPing()) { + return; + } + this.keepaliveTrace( + 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' + ); + const pingSentSuccessfully = session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearKeepaliveTimeout(); + if (err) { + this.keepaliveTrace('Ping failed with error: ' + err.message); this.channelzTrace.addTrace( 'CT_INFO', - 'Connection dropped by keepalive timeout from ' + clientAddress + 'Connection dropped due to error of a ping frame ' + + err.message + + ' return in ' + + duration ); + sessionClosedByServer = true; session.close(); + } else { + this.keepaliveTrace('Received ping response'); + maybeStartKeepalivePingTimer(); } - }, this.keepaliveTimeoutMs); - keepaliveTimeout.unref?.(); - - try { - if ( - !session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - clearTimeout(keepaliveTimeout); - if (err) { - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - } - sessionClosedByServer = true; - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped due to error with ping frame ' + - err.message + - ' return in ' + - duration - ); - session.close(); - } - } - ) - ) { - throw new Error('Server keepalive ping send failed'); - } - channelzSessionInfo.keepAlivesSent += 1; - } catch (e) { - // The ping can't be sent because the session is already closed, max outstanding pings reached, etc - clearTimeout(keepaliveTimeout); - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - } - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped due to error sending ping frame ' + - (e instanceof Error ? e.message : 'unknown error') - ); - session.destroy(); } - }, this.keepaliveTimeMs); - keepaliveInterval.unref?.(); - } + ); - session.once('goaway', (errorCode, lastStreamID, opaqueData) => { - if (errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM) { + if (!pingSentSuccessfully) { + this.keepaliveTrace('Ping failed to send'); this.channelzTrace.addTrace( 'CT_INFO', - 'Connection dropped by client due GOAWAY of ENHANCE_YOUR_CALM from ' + - clientAddress - ); - } else { - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped by client via GOAWAY with error code ' + - errorCode + - ' from ' + - clientAddress + 'Connection dropped due failure to send ping frame' ); + sessionClosedByServer = true; + session.close(); + return; } - sessionClosedByServer = true; - session.destroy(); - }); + + channelzSessionInfo.keepAlivesSent += 1; + + keepaliveTimeout = setTimeout(() => { + clearKeepaliveTimeout(); + this.keepaliveTrace('Ping timeout passed without response'); + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by keepalive timeout from ' + clientAddress + ); + sessionClosedByServer = true; + session.close(); + }, this.keepaliveTimeoutMs); + keepaliveTimeout.unref?.(); + }; + + maybeStartKeepalivePingTimer(); session.on('close', () => { if (!sessionClosedByServer) { @@ -1690,10 +1704,8 @@ export class Server { clearTimeout(connectionAgeGraceTimer); } - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - } + keepaliveDisabled = true; + clearKeepaliveTimeout(); if (idleTimeoutObj !== null) { clearTimeout(idleTimeoutObj.timeout); diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index 71d0f26b..b56cd5b8 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -101,28 +101,29 @@ class Http2Transport implements Transport { /** * The amount of time in between sending pings */ - private keepaliveTimeMs = -1; + private readonly keepaliveTimeMs: number; /** * The amount of time to wait for an acknowledgement after sending a ping */ - private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS; + private readonly keepaliveTimeoutMs: number; /** - * Timer reference for timeout that indicates when to send the next ping + * Indicates whether keepalive pings should be sent without any active calls */ - private keepaliveTimerId: NodeJS.Timeout | null = null; + private readonly keepaliveWithoutCalls: boolean; + /** + * Timer reference indicating when to send the next ping or when the most recent ping will be considered lost. + */ + private keepaliveTimeout: NodeJS.Timeout | null = null; /** * Indicates that the keepalive timer ran out while there were no active * calls, and a ping should be sent the next time a call starts. */ private pendingSendKeepalivePing = false; /** - * Timer reference tracking when the most recent ping will be considered lost + * Indicates when keepalives should no longer be performed for this transport. Used to prevent a race where a + * latent session.ping(..) callback is called after the transport has been notified to disconnect. */ - private keepaliveTimeoutId: NodeJS.Timeout | null = null; - /** - * Indicates whether keepalive pings should be sent without any active calls - */ - private keepaliveWithoutCalls = false; + private keepaliveDisabled = false; private userAgent: string; @@ -182,9 +183,13 @@ class Http2Transport implements Transport { if ('grpc.keepalive_time_ms' in options) { this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!; + } else { + this.keepaliveTimeMs = -1; } if ('grpc.keepalive_timeout_ms' in options) { this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!; + } else { + this.keepaliveTimeoutMs = KEEPALIVE_TIMEOUT_MS; } if ('grpc.keepalive_permit_without_calls' in options) { this.keepaliveWithoutCalls = @@ -195,7 +200,6 @@ class Http2Transport implements Transport { session.once('close', () => { this.trace('session closed'); - this.stopKeepalivePings(); this.handleDisconnect(); }); @@ -383,6 +387,8 @@ class Http2Transport implements Transport { * Handle connection drops, but not GOAWAYs. */ private handleDisconnect() { + this.keepaliveDisabled = true; + this.clearKeepaliveTimeout(); this.reportDisconnectToOwner(false); /* Give calls an event loop cycle to finish naturally before reporting the * disconnnection to them. */ @@ -397,63 +403,48 @@ class Http2Transport implements Transport { this.disconnectListeners.push(listener); } - private clearKeepaliveTimer() { - if (!this.keepaliveTimerId) { - return; - } - clearTimeout(this.keepaliveTimerId); - this.keepaliveTimerId = null; - } - - private clearKeepaliveTimeout() { - if (!this.keepaliveTimeoutId) { - return; - } - clearTimeout(this.keepaliveTimeoutId); - this.keepaliveTimeoutId = null; - } - private canSendPing() { return ( + !this.keepaliveDisabled && this.keepaliveTimeMs > 0 && (this.keepaliveWithoutCalls || this.activeCalls.size > 0) ); } private maybeSendPing() { - this.clearKeepaliveTimer(); if (!this.canSendPing()) { this.pendingSendKeepalivePing = true; return; } + if (this.keepaliveTimeout) { + console.error('keepaliveTimeout is not null'); + return; + } if (this.channelzEnabled) { this.keepalivesSent += 1; } this.keepaliveTrace( 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' ); - if (!this.keepaliveTimeoutId) { - this.keepaliveTimeoutId = setTimeout(() => { - this.keepaliveTrace('Ping timeout passed without response'); - this.handleDisconnect(); - }, this.keepaliveTimeoutMs); - this.keepaliveTimeoutId.unref?.(); - } - try { - this.session!.ping( - (err: Error | null, duration: number, payload: Buffer) => { - if (err) { - this.keepaliveTrace('Ping failed with error ' + err.message); - this.handleDisconnect(); - } + this.keepaliveTimeout = setTimeout(() => { + this.keepaliveTrace('Ping timeout passed without response'); + this.handleDisconnect(); + }, this.keepaliveTimeoutMs); + this.keepaliveTimeout.unref?.(); + const pingSentSuccessfully = this.session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + this.clearKeepaliveTimeout(); + if (err) { + this.keepaliveTrace('Ping failed with error ' + err.message); + this.handleDisconnect(); + } else { this.keepaliveTrace('Received ping response'); - this.clearKeepaliveTimeout(); this.maybeStartKeepalivePingTimer(); } - ); - } catch (e) { - /* If we fail to send a ping, the connection is no longer functional, so - * we should discard it. */ + } + ); + if (!pingSentSuccessfully) { + this.keepaliveTrace('Ping failed to send'); this.handleDisconnect(); } } @@ -471,25 +462,27 @@ class Http2Transport implements Transport { if (this.pendingSendKeepalivePing) { this.pendingSendKeepalivePing = false; this.maybeSendPing(); - } else if (!this.keepaliveTimerId && !this.keepaliveTimeoutId) { + } else if (!this.keepaliveTimeout) { this.keepaliveTrace( 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' ); - this.keepaliveTimerId = setTimeout(() => { + this.keepaliveTimeout = setTimeout(() => { this.maybeSendPing(); }, this.keepaliveTimeMs); - this.keepaliveTimerId.unref?.(); + this.keepaliveTimeout.unref?.(); } /* Otherwise, there is already either a keepalive timer or a ping pending, * wait for those to resolve. */ } - private stopKeepalivePings() { - if (this.keepaliveTimerId) { - clearTimeout(this.keepaliveTimerId); - this.keepaliveTimerId = null; + /** + * Clears whichever keepalive timeout is currently active, if any. + */ + private clearKeepaliveTimeout() { + if (this.keepaliveTimeout) { + clearTimeout(this.keepaliveTimeout); + this.keepaliveTimeout = null; } - this.clearKeepaliveTimeout(); } private removeActiveCall(call: Http2SubchannelCall) { @@ -533,7 +526,7 @@ class Http2Transport implements Transport { * error here. */ try { - http2Stream = this.session!.request(headers); + http2Stream = this.session.request(headers); } catch (e) { this.handleDisconnect(); throw e; From 577b4b4748fbd12e2576e465d837f1ae320a096f Mon Sep 17 00:00:00 2001 From: David Fiala Date: Tue, 28 May 2024 22:32:09 -0700 Subject: [PATCH 04/25] add keepalive server trace back in to match channelz vs non-channelz trace behavior --- packages/grpc-js/src/server.ts | 99 ++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 46 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index ae3ae29c..88846eaf 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -18,50 +18,10 @@ import * as http2 from 'http2'; import * as util from 'util'; +import { CipherNameAndProtocol, TLSSocket } from 'tls'; import { ServiceError } from './call'; -import { Status, LogVerbosity } from './constants'; -import { Deserialize, Serialize, ServiceDefinition } from './make-client'; -import { Metadata } from './metadata'; -import { - BidiStreamingHandler, - ClientStreamingHandler, - HandleCall, - Handler, - HandlerType, - sendUnaryData, - ServerDuplexStream, - ServerDuplexStreamImpl, - ServerReadableStream, - ServerStreamingHandler, - ServerUnaryCall, - ServerWritableStream, - ServerWritableStreamImpl, - UnaryHandler, - ServerErrorResponse, - ServerStatusResponse, - serverErrorToStatus, -} from './server-call'; -import { ServerCredentials } from './server-credentials'; +import { PartialStatusObject } from './call-interface'; import { ChannelOptions } from './channel-options'; -import { - createResolver, - ResolverListener, - mapUriDefaultScheme, -} from './resolver'; -import * as logging from './logging'; -import { - SubchannelAddress, - isTcpSubchannelAddress, - subchannelAddressToString, - stringToSubchannelAddress, -} from './subchannel-address'; -import { - GrpcUri, - combineHostPort, - parseUri, - splitHostPort, - uriToString, -} from './uri-parser'; import { ChannelzCallTracker, ChannelzCallTrackerStub, @@ -69,23 +29,63 @@ import { ChannelzChildrenTrackerStub, ChannelzTrace, ChannelzTraceStub, - registerChannelzServer, - registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, + registerChannelzServer, + registerChannelzSocket, unregisterChannelzRef, } from './channelz'; -import { CipherNameAndProtocol, TLSSocket } from 'tls'; +import { LogVerbosity, Status } from './constants'; +import * as logging from './logging'; +import { Deserialize, Serialize, ServiceDefinition } from './make-client'; +import { Metadata } from './metadata'; +import { + ResolverListener, + createResolver, + mapUriDefaultScheme, +} from './resolver'; +import { + BidiStreamingHandler, + ClientStreamingHandler, + HandleCall, + Handler, + HandlerType, + ServerDuplexStream, + ServerDuplexStreamImpl, + ServerErrorResponse, + ServerReadableStream, + ServerStatusResponse, + ServerStreamingHandler, + ServerUnaryCall, + ServerWritableStream, + ServerWritableStreamImpl, + UnaryHandler, + sendUnaryData, + serverErrorToStatus, +} from './server-call'; +import { ServerCredentials } from './server-credentials'; import { ServerInterceptingCallInterface, ServerInterceptor, getServerInterceptingCall, } from './server-interceptors'; -import { PartialStatusObject } from './call-interface'; +import { + SubchannelAddress, + isTcpSubchannelAddress, + stringToSubchannelAddress, + subchannelAddressToString, +} from './subchannel-address'; import { CallEventTracker } from './transport'; +import { + GrpcUri, + combineHostPort, + parseUri, + splitHostPort, + uriToString, +} from './uri-parser'; const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31); const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); @@ -1469,6 +1469,12 @@ export class Server { clearKeepaliveTimeout(); if (err) { this.keepaliveTrace('Ping failed with error: ' + err.message); + this.trace( + 'Connection dropped due to error of a ping frame ' + + err.message + + ' return in ' + + duration + ); sessionClosedByServer = true; session.close(); } else { @@ -1480,6 +1486,7 @@ export class Server { if (!pingSentSuccessfully) { this.keepaliveTrace('Ping failed to send'); + this.trace('Connection dropped due to failure to send ping frame'); sessionClosedByServer = true; session.close(); return; From 7883164137c4994af074201818a81108bda8204a Mon Sep 17 00:00:00 2001 From: David Fiala Date: Tue, 28 May 2024 22:35:40 -0700 Subject: [PATCH 05/25] return imports back to original order --- packages/grpc-js/src/server.ts | 70 +++++++++++++++++----------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 88846eaf..ad463b95 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -18,67 +18,43 @@ import * as http2 from 'http2'; import * as util from 'util'; -import { CipherNameAndProtocol, TLSSocket } from 'tls'; import { ServiceError } from './call'; -import { PartialStatusObject } from './call-interface'; -import { ChannelOptions } from './channel-options'; -import { - ChannelzCallTracker, - ChannelzCallTrackerStub, - ChannelzChildrenTracker, - ChannelzChildrenTrackerStub, - ChannelzTrace, - ChannelzTraceStub, - ServerInfo, - ServerRef, - SocketInfo, - SocketRef, - TlsInfo, - registerChannelzServer, - registerChannelzSocket, - unregisterChannelzRef, -} from './channelz'; -import { LogVerbosity, Status } from './constants'; -import * as logging from './logging'; +import { Status, LogVerbosity } from './constants'; import { Deserialize, Serialize, ServiceDefinition } from './make-client'; import { Metadata } from './metadata'; -import { - ResolverListener, - createResolver, - mapUriDefaultScheme, -} from './resolver'; import { BidiStreamingHandler, ClientStreamingHandler, HandleCall, Handler, HandlerType, + sendUnaryData, ServerDuplexStream, ServerDuplexStreamImpl, - ServerErrorResponse, ServerReadableStream, - ServerStatusResponse, ServerStreamingHandler, ServerUnaryCall, ServerWritableStream, ServerWritableStreamImpl, UnaryHandler, - sendUnaryData, + ServerErrorResponse, + ServerStatusResponse, serverErrorToStatus, } from './server-call'; import { ServerCredentials } from './server-credentials'; +import { ChannelOptions } from './channel-options'; import { - ServerInterceptingCallInterface, - ServerInterceptor, - getServerInterceptingCall, -} from './server-interceptors'; + createResolver, + ResolverListener, + mapUriDefaultScheme, +} from './resolver'; +import * as logging from './logging'; import { SubchannelAddress, isTcpSubchannelAddress, - stringToSubchannelAddress, subchannelAddressToString, + stringToSubchannelAddress, } from './subchannel-address'; -import { CallEventTracker } from './transport'; import { GrpcUri, combineHostPort, @@ -86,6 +62,30 @@ import { splitHostPort, uriToString, } from './uri-parser'; +import { + ChannelzCallTracker, + ChannelzCallTrackerStub, + ChannelzChildrenTracker, + ChannelzChildrenTrackerStub, + ChannelzTrace, + ChannelzTraceStub, + registerChannelzServer, + registerChannelzSocket, + ServerInfo, + ServerRef, + SocketInfo, + SocketRef, + TlsInfo, + unregisterChannelzRef, +} from './channelz'; +import { CipherNameAndProtocol, TLSSocket } from 'tls'; +import { + ServerInterceptingCallInterface, + ServerInterceptor, + getServerInterceptingCall, +} from './server-interceptors'; +import { PartialStatusObject } from './call-interface'; +import { CallEventTracker } from './transport'; const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31); const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); From 19cdc1233c40b4586d29fc6454e95152af4661f6 Mon Sep 17 00:00:00 2001 From: David Fiala Date: Tue, 28 May 2024 22:37:24 -0700 Subject: [PATCH 06/25] another missing trace message for parity --- packages/grpc-js/src/server.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index ad463b95..9ed9b8d7 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1495,6 +1495,7 @@ export class Server { keepaliveTimeout = setTimeout(() => { clearKeepaliveTimeout(); this.keepaliveTrace('Ping timeout passed without response'); + this.trace('Connection dropped by keepalive timeout'); sessionClosedByServer = true; session.close(); }, this.keepaliveTimeoutMs); From bed5e85af9b5214c5827d9cb7c1465a8ee57ee7c Mon Sep 17 00:00:00 2001 From: David Fiala Date: Tue, 28 May 2024 22:43:51 -0700 Subject: [PATCH 07/25] resolve hoisting --- packages/grpc-js/src/server.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 9ed9b8d7..2dc3c2f3 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1623,6 +1623,9 @@ export class Server { ); }; + /* eslint-disable-next-line prefer-const */ + let sendPing: () => void; // hoisted for use in maybeStartKeepalivePingTimer + const maybeStartKeepalivePingTimer = () => { if (!canSendPing()) { return; @@ -1637,7 +1640,7 @@ export class Server { keepaliveTimeout.unref?.(); }; - const sendPing = () => { + sendPing = () => { if (!canSendPing()) { return; } From d325b5fff38a240cb3b43a70e0c7ce326c1f692e Mon Sep 17 00:00:00 2001 From: David Fiala Date: Tue, 28 May 2024 22:46:48 -0700 Subject: [PATCH 08/25] hoist in second location --- packages/grpc-js/src/server.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 2dc3c2f3..b749740d 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1443,6 +1443,9 @@ export class Server { ); }; + /* eslint-disable-next-line prefer-const */ + let sendPing: () => void; // hoisted for use in maybeStartKeepalivePingTimer + const maybeStartKeepalivePingTimer = () => { if (!canSendPing()) { return; @@ -1457,7 +1460,7 @@ export class Server { keepaliveTimeout.unref?.(); }; - const sendPing = () => { + sendPing = () => { if (!canSendPing()) { return; } From a77d94f7c62045253343349d6b140795b8db73a4 Mon Sep 17 00:00:00 2001 From: David Fiala Date: Wed, 29 May 2024 10:37:40 -0700 Subject: [PATCH 09/25] Based on grpc/grpc-node#2139 I wrapped http2session.ping in a try-catch block again --- packages/grpc-js/src/server.ts | 98 ++++++++++++++++++------------- packages/grpc-js/src/transport.ts | 33 +++++++---- 2 files changed, 78 insertions(+), 53 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index b749740d..8db0aac5 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1467,29 +1467,35 @@ export class Server { this.keepaliveTrace( 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' ); - const pingSentSuccessfully = session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - clearKeepaliveTimeout(); - if (err) { - this.keepaliveTrace('Ping failed with error: ' + err.message); - this.trace( - 'Connection dropped due to error of a ping frame ' + - err.message + - ' return in ' + - duration - ); - sessionClosedByServer = true; - session.close(); - } else { - this.keepaliveTrace('Received ping response'); - maybeStartKeepalivePingTimer(); + let pingSendError = ''; + try { + const pingSentSuccessfully = session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearKeepaliveTimeout(); + if (err) { + this.keepaliveTrace('Ping failed with error: ' + err.message); + sessionClosedByServer = true; + session.close(); + } else { + this.keepaliveTrace('Received ping response'); + maybeStartKeepalivePingTimer(); + } } + ); + if (!pingSentSuccessfully) { + pingSendError = 'Ping returned false'; } - ); + } catch (e) { + // grpc/grpc-node#2139 + pingSendError = + (e instanceof Error ? e.message : '') || 'Unknown error'; + } - if (!pingSentSuccessfully) { - this.keepaliveTrace('Ping failed to send'); - this.trace('Connection dropped due to failure to send ping frame'); + if (pingSendError) { + this.keepaliveTrace('Ping send failed: ' + pingSendError); + this.trace( + 'Connection dropped due to ping send error: ' + pingSendError + ); sessionClosedByServer = true; session.close(); return; @@ -1650,32 +1656,42 @@ export class Server { this.keepaliveTrace( 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' ); - const pingSentSuccessfully = session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - clearKeepaliveTimeout(); - if (err) { - this.keepaliveTrace('Ping failed with error: ' + err.message); - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped due to error of a ping frame ' + - err.message + - ' return in ' + - duration - ); - sessionClosedByServer = true; - session.close(); - } else { - this.keepaliveTrace('Received ping response'); - maybeStartKeepalivePingTimer(); + let pingSendError = ''; + try { + const pingSentSuccessfully = session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearKeepaliveTimeout(); + if (err) { + this.keepaliveTrace('Ping failed with error: ' + err.message); + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped due to error of a ping frame ' + + err.message + + ' return in ' + + duration + ); + sessionClosedByServer = true; + session.close(); + } else { + this.keepaliveTrace('Received ping response'); + maybeStartKeepalivePingTimer(); + } } + ); + if (!pingSentSuccessfully) { + pingSendError = 'Ping returned false'; } - ); + } catch (e) { + // grpc/grpc-node#2139 + pingSendError = + (e instanceof Error ? e.message : '') || 'Unknown error'; + } - if (!pingSentSuccessfully) { - this.keepaliveTrace('Ping failed to send'); + if (pingSendError) { + this.keepaliveTrace('Ping send failed: ' + pingSendError); this.channelzTrace.addTrace( 'CT_INFO', - 'Connection dropped due failure to send ping frame' + 'Connection dropped due to ping send error: ' + pingSendError ); sessionClosedByServer = true; session.close(); diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index b56cd5b8..a3f5a78f 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -431,20 +431,29 @@ class Http2Transport implements Transport { this.handleDisconnect(); }, this.keepaliveTimeoutMs); this.keepaliveTimeout.unref?.(); - const pingSentSuccessfully = this.session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - this.clearKeepaliveTimeout(); - if (err) { - this.keepaliveTrace('Ping failed with error ' + err.message); - this.handleDisconnect(); - } else { - this.keepaliveTrace('Received ping response'); - this.maybeStartKeepalivePingTimer(); + let pingSendError = ''; + try { + const pingSentSuccessfully = this.session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + this.clearKeepaliveTimeout(); + if (err) { + this.keepaliveTrace('Ping failed with error ' + err.message); + this.handleDisconnect(); + } else { + this.keepaliveTrace('Received ping response'); + this.maybeStartKeepalivePingTimer(); + } } + ); + if (!pingSentSuccessfully) { + pingSendError = 'Ping returned false'; } - ); - if (!pingSentSuccessfully) { - this.keepaliveTrace('Ping failed to send'); + } catch (e) { + // grpc/grpc-node#2139 + pingSendError = (e instanceof Error ? e.message : '') || 'Unknown error'; + } + if (pingSendError) { + this.keepaliveTrace('Ping send failed: ' + pingSendError); this.handleDisconnect(); } } From c2da436a8e089c9168c8a430347819442e86ac6a Mon Sep 17 00:00:00 2001 From: David Fiala Date: Wed, 29 May 2024 15:09:55 -0700 Subject: [PATCH 10/25] remove keepaliveDisabled from server.ts. rename keepaliveTimer. --- packages/grpc-js/src/server.ts | 24 ++++++++++-------------- packages/grpc-js/src/transport.ts | 20 ++++++++++---------- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 8db0aac5..33db4100 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1384,8 +1384,7 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; - let keepaliveTimeout: NodeJS.Timeout | null = null; - let keepaliveDisabled = false; + let keepaliveTimer: NodeJS.Timeout | null = null; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1429,15 +1428,15 @@ export class Server { } const clearKeepaliveTimeout = () => { - if (keepaliveTimeout) { - clearTimeout(keepaliveTimeout); - keepaliveTimeout = null; + if (keepaliveTimer) { + clearTimeout(keepaliveTimer); + keepaliveTimer = null; } }; const canSendPing = () => { return ( - !keepaliveDisabled && + !session.destroyed && this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS && this.keepaliveTimeMs > 0 ); @@ -1453,11 +1452,11 @@ export class Server { this.keepaliveTrace( 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' ); - keepaliveTimeout = setTimeout(() => { + keepaliveTimer = setTimeout(() => { clearKeepaliveTimeout(); sendPing(); }, this.keepaliveTimeMs); - keepaliveTimeout.unref?.(); + keepaliveTimer.unref?.(); }; sendPing = () => { @@ -1501,14 +1500,14 @@ export class Server { return; } - keepaliveTimeout = setTimeout(() => { + keepaliveTimer = setTimeout(() => { clearKeepaliveTimeout(); this.keepaliveTrace('Ping timeout passed without response'); this.trace('Connection dropped by keepalive timeout'); sessionClosedByServer = true; session.close(); }, this.keepaliveTimeoutMs); - keepaliveTimeout.unref?.(); + keepaliveTimer.unref?.(); }; maybeStartKeepalivePingTimer(); @@ -1528,7 +1527,6 @@ export class Server { clearTimeout(connectionAgeGraceTimer); } - keepaliveDisabled = true; clearKeepaliveTimeout(); if (idleTimeoutObj !== null) { @@ -1575,7 +1573,6 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; let keepaliveTimeout: NodeJS.Timeout | null = null; - let keepaliveDisabled = false; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1626,7 +1623,7 @@ export class Server { const canSendPing = () => { return ( - !keepaliveDisabled && + !session.destroyed && this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS && this.keepaliveTimeMs > 0 ); @@ -1734,7 +1731,6 @@ export class Server { clearTimeout(connectionAgeGraceTimer); } - keepaliveDisabled = true; clearKeepaliveTimeout(); if (idleTimeoutObj !== null) { diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index a3f5a78f..b7ac2df7 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -113,7 +113,7 @@ class Http2Transport implements Transport { /** * Timer reference indicating when to send the next ping or when the most recent ping will be considered lost. */ - private keepaliveTimeout: NodeJS.Timeout | null = null; + private keepaliveTimer: NodeJS.Timeout | null = null; /** * Indicates that the keepalive timer ran out while there were no active * calls, and a ping should be sent the next time a call starts. @@ -416,7 +416,7 @@ class Http2Transport implements Transport { this.pendingSendKeepalivePing = true; return; } - if (this.keepaliveTimeout) { + if (this.keepaliveTimer) { console.error('keepaliveTimeout is not null'); return; } @@ -426,11 +426,11 @@ class Http2Transport implements Transport { this.keepaliveTrace( 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' ); - this.keepaliveTimeout = setTimeout(() => { + this.keepaliveTimer = setTimeout(() => { this.keepaliveTrace('Ping timeout passed without response'); this.handleDisconnect(); }, this.keepaliveTimeoutMs); - this.keepaliveTimeout.unref?.(); + this.keepaliveTimer.unref?.(); let pingSendError = ''; try { const pingSentSuccessfully = this.session.ping( @@ -471,14 +471,14 @@ class Http2Transport implements Transport { if (this.pendingSendKeepalivePing) { this.pendingSendKeepalivePing = false; this.maybeSendPing(); - } else if (!this.keepaliveTimeout) { + } else if (!this.keepaliveTimer) { this.keepaliveTrace( 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' ); - this.keepaliveTimeout = setTimeout(() => { + this.keepaliveTimer = setTimeout(() => { this.maybeSendPing(); }, this.keepaliveTimeMs); - this.keepaliveTimeout.unref?.(); + this.keepaliveTimer.unref?.(); } /* Otherwise, there is already either a keepalive timer or a ping pending, * wait for those to resolve. */ @@ -488,9 +488,9 @@ class Http2Transport implements Transport { * Clears whichever keepalive timeout is currently active, if any. */ private clearKeepaliveTimeout() { - if (this.keepaliveTimeout) { - clearTimeout(this.keepaliveTimeout); - this.keepaliveTimeout = null; + if (this.keepaliveTimer) { + clearTimeout(this.keepaliveTimer); + this.keepaliveTimer = null; } } From 7719e37c8310bc4a278078ae5b018445454ae006 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 5 Jun 2024 17:55:56 -0700 Subject: [PATCH 11/25] grpc-js: Fix client hang when receiving extra messages for a unary response --- packages/grpc-js/src/client.ts | 9 +- packages/grpc-js/test/test-server-errors.ts | 92 +++++++++++++++++++++ 2 files changed, 97 insertions(+), 4 deletions(-) diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index 995d5b32..dc75ac48 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -330,7 +330,7 @@ export class Client { // eslint-disable-next-line @typescript-eslint/no-explicit-any onReceiveMessage(message: any) { if (responseMessage !== null) { - call.cancelWithStatus(Status.INTERNAL, 'Too many responses received'); + call.cancelWithStatus(Status.UNIMPLEMENTED, 'Too many responses received'); } responseMessage = message; }, @@ -345,7 +345,7 @@ export class Client { callProperties.callback!( callErrorFromStatus( { - code: Status.INTERNAL, + code: Status.UNIMPLEMENTED, details: 'No message received', metadata: status.metadata, }, @@ -463,9 +463,10 @@ export class Client { // eslint-disable-next-line @typescript-eslint/no-explicit-any onReceiveMessage(message: any) { if (responseMessage !== null) { - call.cancelWithStatus(Status.INTERNAL, 'Too many responses received'); + call.cancelWithStatus(Status.UNIMPLEMENTED, 'Too many responses received'); } responseMessage = message; + call.startRead(); }, onReceiveStatus(status: StatusObject) { if (receivedStatus) { @@ -478,7 +479,7 @@ export class Client { callProperties.callback!( callErrorFromStatus( { - code: Status.INTERNAL, + code: Status.UNIMPLEMENTED, details: 'No message received', metadata: status.metadata, }, diff --git a/packages/grpc-js/test/test-server-errors.ts b/packages/grpc-js/test/test-server-errors.ts index 24ccfeef..7367a907 100644 --- a/packages/grpc-js/test/test-server-errors.ts +++ b/packages/grpc-js/test/test-server-errors.ts @@ -286,6 +286,98 @@ describe('Server serialization failure handling', () => { }); }); +describe('Cardinality violations', () => { + let client: ServiceClient; + let server: Server; + let responseCount: number = 1; + const testMessage = Buffer.from([]); + before(done => { + const serverServiceDefinition = { + testMethod: { + path: '/TestService/TestMethod/', + requestStream: false, + responseStream: true, + requestSerialize: identity, + requestDeserialize: identity, + responseDeserialize: identity, + responseSerialize: identity + } + }; + const clientServiceDefinition = { + testMethod: { + path: '/TestService/TestMethod/', + requestStream: true, + responseStream: false, + requestSerialize: identity, + requestDeserialize: identity, + responseDeserialize: identity, + responseSerialize: identity + } + }; + const TestClient = grpc.makeClientConstructor(clientServiceDefinition, 'TestService'); + server = new grpc.Server(); + server.addService(serverServiceDefinition, { + testMethod(stream: ServerWritableStream) { + for (let i = 0; i < responseCount; i++) { + stream.write(testMessage); + } + stream.end(); + } + }); + server.bindAsync('localhost:0', serverInsecureCreds, (error, port) => { + assert.ifError(error); + client = new TestClient(`localhost:${port}`, clientInsecureCreds); + done(); + }); + }); + beforeEach(() => { + responseCount = 1; + }); + after(done => { + client.close(); + server.tryShutdown(done); + }); + it('Should fail if the client sends too few messages', done => { + const call = client.testMethod((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + done(); + }); + call.end(); + }); + it('Should fail if the client sends too many messages', done => { + const call = client.testMethod((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + done(); + }); + call.write(testMessage); + call.write(testMessage); + call.end(); + }); + it('Should fail if the server sends too few messages', done => { + responseCount = 0; + const call = client.testMethod((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + done(); + }); + call.write(testMessage); + call.end(); + }); + it('Should fail if the server sends too many messages', done => { + responseCount = 2; + const call = client.testMethod((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + done(); + }); + call.write(testMessage); + call.end(); + }); + +}); + describe('Other conditions', () => { let client: ServiceClient; let server: Server; From 3c5ab229b1838f624ace174eef850f7cb611df9e Mon Sep 17 00:00:00 2001 From: David Fiala Date: Wed, 5 Jun 2024 19:01:02 -0700 Subject: [PATCH 12/25] per discussion, avoid tracking keepalive disabled state and instead depend on whether the session is destroyed --- packages/grpc-js/src/transport.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index b7ac2df7..ab543821 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -119,11 +119,6 @@ class Http2Transport implements Transport { * calls, and a ping should be sent the next time a call starts. */ private pendingSendKeepalivePing = false; - /** - * Indicates when keepalives should no longer be performed for this transport. Used to prevent a race where a - * latent session.ping(..) callback is called after the transport has been notified to disconnect. - */ - private keepaliveDisabled = false; private userAgent: string; @@ -387,7 +382,6 @@ class Http2Transport implements Transport { * Handle connection drops, but not GOAWAYs. */ private handleDisconnect() { - this.keepaliveDisabled = true; this.clearKeepaliveTimeout(); this.reportDisconnectToOwner(false); /* Give calls an event loop cycle to finish naturally before reporting the @@ -396,6 +390,7 @@ class Http2Transport implements Transport { for (const call of this.activeCalls) { call.onDisconnect(); } + this.session.destroy(); }); } @@ -405,7 +400,7 @@ class Http2Transport implements Transport { private canSendPing() { return ( - !this.keepaliveDisabled && + !this.session.destroyed && this.keepaliveTimeMs > 0 && (this.keepaliveWithoutCalls || this.activeCalls.size > 0) ); From e64d816d7df6d6cde62314beb67d11f1e0a8c79e Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 5 Jun 2024 11:22:23 -0700 Subject: [PATCH 13/25] grpc-js: Avoid buffering significantly more than max_receive_message_size per received message (1.10.x) --- packages/grpc-js/src/compression-filter.ts | 67 ++++++++++---- packages/grpc-js/src/internal-channel.ts | 2 - .../grpc-js/src/max-message-size-filter.ts | 88 ------------------- packages/grpc-js/src/server-interceptors.ts | 88 ++++++++++++------- packages/grpc-js/src/stream-decoder.ts | 5 ++ packages/grpc-js/src/subchannel-call.ts | 14 ++- packages/grpc-js/src/transport.ts | 7 +- .../grpc-js/test/fixtures/test_service.proto | 1 + packages/grpc-js/test/test-server-errors.ts | 49 ++++++++++- 9 files changed, 173 insertions(+), 148 deletions(-) delete mode 100644 packages/grpc-js/src/max-message-size-filter.ts diff --git a/packages/grpc-js/src/compression-filter.ts b/packages/grpc-js/src/compression-filter.ts index 136311ad..f1600b36 100644 --- a/packages/grpc-js/src/compression-filter.ts +++ b/packages/grpc-js/src/compression-filter.ts @@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface'; import { Channel } from './channel'; import { ChannelOptions } from './channel-options'; import { CompressionAlgorithms } from './compression-algorithms'; -import { LogVerbosity } from './constants'; +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, LogVerbosity, Status } from './constants'; import { BaseFilter, Filter, FilterFactory } from './filter'; import * as logging from './logging'; import { Metadata, MetadataValue } from './metadata'; @@ -98,6 +98,10 @@ class IdentityHandler extends CompressionHandler { } class DeflateHandler extends CompressionHandler { + constructor(private maxRecvMessageLength: number) { + super(); + } + compressMessage(message: Buffer) { return new Promise((resolve, reject) => { zlib.deflate(message, (err, output) => { @@ -112,18 +116,34 @@ class DeflateHandler extends CompressionHandler { decompressMessage(message: Buffer) { return new Promise((resolve, reject) => { - zlib.inflate(message, (err, output) => { - if (err) { - reject(err); - } else { - resolve(output); + let totalLength = 0; + const messageParts: Buffer[] = []; + const decompresser = zlib.createInflate(); + decompresser.on('data', (chunk: Buffer) => { + messageParts.push(chunk); + totalLength += chunk.byteLength; + if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) { + decompresser.destroy(); + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}` + }); } }); + decompresser.on('end', () => { + resolve(Buffer.concat(messageParts)); + }); + decompresser.write(message); + decompresser.end(); }); } } class GzipHandler extends CompressionHandler { + constructor(private maxRecvMessageLength: number) { + super(); + } + compressMessage(message: Buffer) { return new Promise((resolve, reject) => { zlib.gzip(message, (err, output) => { @@ -138,13 +158,25 @@ class GzipHandler extends CompressionHandler { decompressMessage(message: Buffer) { return new Promise((resolve, reject) => { - zlib.unzip(message, (err, output) => { - if (err) { - reject(err); - } else { - resolve(output); + let totalLength = 0; + const messageParts: Buffer[] = []; + const decompresser = zlib.createGunzip(); + decompresser.on('data', (chunk: Buffer) => { + messageParts.push(chunk); + totalLength += chunk.byteLength; + if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) { + decompresser.destroy(); + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}` + }); } }); + decompresser.on('end', () => { + resolve(Buffer.concat(messageParts)); + }); + decompresser.write(message); + decompresser.end(); }); } } @@ -169,14 +201,14 @@ class UnknownHandler extends CompressionHandler { } } -function getCompressionHandler(compressionName: string): CompressionHandler { +function getCompressionHandler(compressionName: string, maxReceiveMessageSize: number): CompressionHandler { switch (compressionName) { case 'identity': return new IdentityHandler(); case 'deflate': - return new DeflateHandler(); + return new DeflateHandler(maxReceiveMessageSize); case 'gzip': - return new GzipHandler(); + return new GzipHandler(maxReceiveMessageSize); default: return new UnknownHandler(compressionName); } @@ -186,6 +218,7 @@ export class CompressionFilter extends BaseFilter implements Filter { private sendCompression: CompressionHandler = new IdentityHandler(); private receiveCompression: CompressionHandler = new IdentityHandler(); private currentCompressionAlgorithm: CompressionAlgorithm = 'identity'; + private maxReceiveMessageLength: number; constructor( channelOptions: ChannelOptions, @@ -195,6 +228,7 @@ export class CompressionFilter extends BaseFilter implements Filter { const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm']; + this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH if (compressionAlgorithmKey !== undefined) { if (isCompressionAlgorithmKey(compressionAlgorithmKey)) { const clientSelectedEncoding = CompressionAlgorithms[ @@ -215,7 +249,8 @@ export class CompressionFilter extends BaseFilter implements Filter { ) { this.currentCompressionAlgorithm = clientSelectedEncoding; this.sendCompression = getCompressionHandler( - this.currentCompressionAlgorithm + this.currentCompressionAlgorithm, + -1 ); } } else { @@ -247,7 +282,7 @@ export class CompressionFilter extends BaseFilter implements Filter { if (receiveEncoding.length > 0) { const encoding: MetadataValue = receiveEncoding[0]; if (typeof encoding === 'string') { - this.receiveCompression = getCompressionHandler(encoding); + this.receiveCompression = getCompressionHandler(encoding, this.maxReceiveMessageLength); } } metadata.remove('grpc-encoding'); diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 469ace55..e0cebd46 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -33,7 +33,6 @@ import { } from './resolver'; import { trace } from './logging'; import { SubchannelAddress } from './subchannel-address'; -import { MaxMessageSizeFilterFactory } from './max-message-size-filter'; import { mapProxyName } from './http_proxy'; import { GrpcUri, parseUri, uriToString } from './uri-parser'; import { ServerSurfaceCall } from './server-call'; @@ -402,7 +401,6 @@ export class InternalChannel { } ); this.filterStackFactory = new FilterStackFactory([ - new MaxMessageSizeFilterFactory(this.options), new CompressionFilterFactory(this, this.options), ]); this.trace( diff --git a/packages/grpc-js/src/max-message-size-filter.ts b/packages/grpc-js/src/max-message-size-filter.ts deleted file mode 100644 index b6df374b..00000000 --- a/packages/grpc-js/src/max-message-size-filter.ts +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { BaseFilter, Filter, FilterFactory } from './filter'; -import { WriteObject } from './call-interface'; -import { - Status, - DEFAULT_MAX_SEND_MESSAGE_LENGTH, - DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, -} from './constants'; -import { ChannelOptions } from './channel-options'; -import { Metadata } from './metadata'; - -export class MaxMessageSizeFilter extends BaseFilter implements Filter { - private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH; - private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH; - constructor(options: ChannelOptions) { - super(); - if ('grpc.max_send_message_length' in options) { - this.maxSendMessageSize = options['grpc.max_send_message_length']!; - } - if ('grpc.max_receive_message_length' in options) { - this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!; - } - } - - async sendMessage(message: Promise): Promise { - /* A configured size of -1 means that there is no limit, so skip the check - * entirely */ - if (this.maxSendMessageSize === -1) { - return message; - } else { - const concreteMessage = await message; - if (concreteMessage.message.length > this.maxSendMessageSize) { - throw { - code: Status.RESOURCE_EXHAUSTED, - details: `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`, - metadata: new Metadata(), - }; - } else { - return concreteMessage; - } - } - } - - async receiveMessage(message: Promise): Promise { - /* A configured size of -1 means that there is no limit, so skip the check - * entirely */ - if (this.maxReceiveMessageSize === -1) { - return message; - } else { - const concreteMessage = await message; - if (concreteMessage.length > this.maxReceiveMessageSize) { - throw { - code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`, - metadata: new Metadata(), - }; - } else { - return concreteMessage; - } - } - } -} - -export class MaxMessageSizeFilterFactory - implements FilterFactory -{ - constructor(private readonly options: ChannelOptions) {} - - createFilter(): MaxMessageSizeFilter { - return new MaxMessageSizeFilter(this.options); - } -} diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index b62d5510..c2d985a6 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -30,14 +30,10 @@ import { import * as http2 from 'http2'; import { getErrorMessage } from './error'; import * as zlib from 'zlib'; -import { promisify } from 'util'; import { StreamDecoder } from './stream-decoder'; import { CallEventTracker } from './transport'; import * as logging from './logging'; -const unzip = promisify(zlib.unzip); -const inflate = promisify(zlib.inflate); - const TRACER_NAME = 'server_call'; function trace(text: string) { @@ -496,7 +492,7 @@ export class BaseServerInterceptingCall private wantTrailers = false; private cancelNotified = false; private incomingEncoding = 'identity'; - private decoder = new StreamDecoder(); + private decoder: StreamDecoder; private readQueue: ReadQueueEntry[] = []; private isReadPending = false; private receivedHalfClose = false; @@ -554,6 +550,8 @@ export class BaseServerInterceptingCall this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!; } + this.decoder = new StreamDecoder(this.maxReceiveMessageSize); + const metadata = Metadata.fromHttp2Headers(headers); if (logging.isTracerEnabled(TRACER_NAME)) { @@ -674,18 +672,41 @@ export class BaseServerInterceptingCall message: Buffer, encoding: string ): Buffer | Promise { - switch (encoding) { - case 'deflate': - return inflate(message.subarray(5)); - case 'gzip': - return unzip(message.subarray(5)); - case 'identity': - return message.subarray(5); - default: - return Promise.reject({ - code: Status.UNIMPLEMENTED, - details: `Received message compressed with unsupported encoding "${encoding}"`, + const messageContents = message.subarray(5); + if (encoding === 'identity') { + return messageContents; + } else if (encoding === 'deflate' || encoding === 'gzip') { + let decompresser: zlib.Gunzip | zlib.Deflate; + if (encoding === 'deflate') { + decompresser = zlib.createInflate(); + } else { + decompresser = zlib.createGunzip(); + } + return new Promise((resolve, reject) => { + let totalLength = 0 + const messageParts: Buffer[] = []; + decompresser.on('data', (chunk: Buffer) => { + messageParts.push(chunk); + totalLength += chunk.byteLength; + if (this.maxReceiveMessageSize !== -1 && totalLength > this.maxReceiveMessageSize) { + decompresser.destroy(); + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message that decompresses to a size larger than ${this.maxReceiveMessageSize}` + }); + } }); + decompresser.on('end', () => { + resolve(Buffer.concat(messageParts)); + }); + decompresser.write(messageContents); + decompresser.end(); + }); + } else { + return Promise.reject({ + code: Status.UNIMPLEMENTED, + details: `Received message compressed with unsupported encoding "${encoding}"`, + }); } } @@ -698,10 +719,16 @@ export class BaseServerInterceptingCall const compressedMessageEncoding = compressed ? this.incomingEncoding : 'identity'; - const decompressedMessage = await this.decompressMessage( - queueEntry.compressedMessage!, - compressedMessageEncoding - ); + let decompressedMessage: Buffer; + try { + decompressedMessage = await this.decompressMessage( + queueEntry.compressedMessage!, + compressedMessageEncoding + ); + } catch (err) { + this.sendStatus(err as PartialStatusObject); + return; + } try { queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage); } catch (err) { @@ -743,23 +770,16 @@ export class BaseServerInterceptingCall ' received data frame of size ' + data.length ); - const rawMessages = this.decoder.write(data); + let rawMessages: Buffer[]; + try { + rawMessages = this.decoder.write(data); + } catch (e) { + this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, details: (e as Error).message }); + return; + } for (const messageBytes of rawMessages) { this.stream.pause(); - if ( - this.maxReceiveMessageSize !== -1 && - messageBytes.length - 5 > this.maxReceiveMessageSize - ) { - this.sendStatus({ - code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${ - messageBytes.length - 5 - } vs. ${this.maxReceiveMessageSize})`, - metadata: null, - }); - return; - } const queueEntry: ReadQueueEntry = { type: 'COMPRESSED', compressedMessage: messageBytes, diff --git a/packages/grpc-js/src/stream-decoder.ts b/packages/grpc-js/src/stream-decoder.ts index 671ad41a..ea669d14 100644 --- a/packages/grpc-js/src/stream-decoder.ts +++ b/packages/grpc-js/src/stream-decoder.ts @@ -30,6 +30,8 @@ export class StreamDecoder { private readPartialMessage: Buffer[] = []; private readMessageRemaining = 0; + constructor(private maxReadMessageLength: number) {} + write(data: Buffer): Buffer[] { let readHead = 0; let toRead: number; @@ -60,6 +62,9 @@ export class StreamDecoder { // readSizeRemaining >=0 here if (this.readSizeRemaining === 0) { this.readMessageSize = this.readPartialSize.readUInt32BE(0); + if (this.maxReadMessageLength !== -1 && this.readMessageSize > this.maxReadMessageLength) { + throw new Error(`Received message larger than max (${this.readMessageSize} vs ${this.maxReadMessageLength})`); + } this.readMessageRemaining = this.readMessageSize; if (this.readMessageRemaining > 0) { this.readState = ReadState.READING_MESSAGE; diff --git a/packages/grpc-js/src/subchannel-call.ts b/packages/grpc-js/src/subchannel-call.ts index 0ce7d72c..bee00119 100644 --- a/packages/grpc-js/src/subchannel-call.ts +++ b/packages/grpc-js/src/subchannel-call.ts @@ -18,7 +18,7 @@ import * as http2 from 'http2'; import * as os from 'os'; -import { Status } from './constants'; +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, Status } from './constants'; import { Metadata } from './metadata'; import { StreamDecoder } from './stream-decoder'; import * as logging from './logging'; @@ -116,7 +116,7 @@ function mapHttpStatusCode(code: number): StatusObject { } export class Http2SubchannelCall implements SubchannelCall { - private decoder = new StreamDecoder(); + private decoder: StreamDecoder; private isReadFilterPending = false; private isPushPending = false; @@ -147,6 +147,8 @@ export class Http2SubchannelCall implements SubchannelCall { private readonly transport: Transport, private readonly callId: number ) { + const maxReceiveMessageLength = transport.getOptions()['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH; + this.decoder = new StreamDecoder(maxReceiveMessageLength); http2Stream.on('response', (headers, flags) => { let headersString = ''; for (const header of Object.keys(headers)) { @@ -182,7 +184,13 @@ export class Http2SubchannelCall implements SubchannelCall { return; } this.trace('receive HTTP/2 data frame of length ' + data.length); - const messages = this.decoder.write(data); + let messages: Buffer[]; + try { + messages = this.decoder.write(data); + } catch (e) { + this.cancelWithStatus(Status.RESOURCE_EXHAUSTED, (e as Error).message); + return; + } for (const message of messages) { this.trace('parsed message of length ' + message.length); diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index 66a5d455..934b6211 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -84,6 +84,7 @@ export interface TransportDisconnectListener { export interface Transport { getChannelzRef(): SocketRef; getPeerName(): string; + getOptions(): ChannelOptions; createCall( metadata: Metadata, host: string, @@ -147,7 +148,7 @@ class Http2Transport implements Transport { constructor( private session: http2.ClientHttp2Session, subchannelAddress: SubchannelAddress, - options: ChannelOptions, + private options: ChannelOptions, /** * Name of the remote server, if it is not the same as the subchannel * address, i.e. if connecting through an HTTP CONNECT proxy. @@ -617,6 +618,10 @@ class Http2Transport implements Transport { return this.subchannelAddressString; } + getOptions() { + return this.options; + } + shutdown() { this.session.close(); unregisterChannelzRef(this.channelzRef); diff --git a/packages/grpc-js/test/fixtures/test_service.proto b/packages/grpc-js/test/fixtures/test_service.proto index 64ce0d37..2a7a303f 100644 --- a/packages/grpc-js/test/fixtures/test_service.proto +++ b/packages/grpc-js/test/fixtures/test_service.proto @@ -21,6 +21,7 @@ message Request { bool error = 1; string message = 2; int32 errorAfter = 3; + int32 responseLength = 4; } message Response { diff --git a/packages/grpc-js/test/test-server-errors.ts b/packages/grpc-js/test/test-server-errors.ts index 24ccfeef..243e1091 100644 --- a/packages/grpc-js/test/test-server-errors.ts +++ b/packages/grpc-js/test/test-server-errors.ts @@ -33,6 +33,7 @@ import { } from '../src/server-call'; import { loadProtoFile } from './common'; +import { CompressionAlgorithms } from '../src/compression-algorithms'; const protoFile = join(__dirname, 'fixtures', 'test_service.proto'); const testServiceDef = loadProtoFile(protoFile); @@ -310,7 +311,7 @@ describe('Other conditions', () => { trailerMetadata ); } else { - cb(null, { count: 1 }, trailerMetadata); + cb(null, { count: 1, message: 'a'.repeat(req.responseLength) }, trailerMetadata); } }, @@ -320,6 +321,7 @@ describe('Other conditions', () => { ) { let count = 0; let errored = false; + let responseLength = 0; stream.on('data', (data: any) => { if (data.error) { @@ -327,13 +329,14 @@ describe('Other conditions', () => { errored = true; cb(new Error(message) as ServiceError, null, trailerMetadata); } else { + responseLength += data.responseLength; count++; } }); stream.on('end', () => { if (!errored) { - cb(null, { count }, trailerMetadata); + cb(null, { count, message: 'a'.repeat(responseLength) }, trailerMetadata); } }); }, @@ -349,7 +352,7 @@ describe('Other conditions', () => { }); } else { for (let i = 1; i <= 5; i++) { - stream.write({ count: i }); + stream.write({ count: i, message: 'a'.repeat(req.responseLength) }); if (req.errorAfter && req.errorAfter === i) { stream.emit('error', { code: grpc.status.UNKNOWN, @@ -376,7 +379,7 @@ describe('Other conditions', () => { err.metadata.add('count', '' + count); stream.emit('error', err); } else { - stream.write({ count }); + stream.write({ count, message: 'a'.repeat(data.responseLength) }); count++; } }); @@ -740,6 +743,44 @@ describe('Other conditions', () => { }); }); }); + + describe('Max message size', () => { + const largeMessage = 'a'.repeat(10_000_000); + it('Should be enforced on the server', done => { + client.unary({ message: largeMessage }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + done(); + }); + }); + it('Should be enforced on the client', done => { + client.unary({ responseLength: 10_000_000 }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + done(); + }); + }); + describe('Compressed messages', () => { + it('Should be enforced with gzip', done => { + const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.gzip}); + compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + assert.match(error.details, /Received message that decompresses to a size larger/); + done(); + }); + }); + it('Should be enforced with deflate', done => { + const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.deflate}); + compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + assert.match(error.details, /Received message that decompresses to a size larger/); + done(); + }); + }); + }); + }); }); function identity(arg: any): any { From 98cd87f7512c95cd48bf04c3225f0fa22b5dcb78 Mon Sep 17 00:00:00 2001 From: David Fiala Date: Thu, 6 Jun 2024 22:57:13 -0700 Subject: [PATCH 14/25] ensure that client keepalive timers are always cleared when they trigger. this is a necessary change to fit with having removed keepaliveDisabled boolean. manually inspected test logs for both server.ts and transport.ts to verify both types of keepalives are operating correctly. --- packages/grpc-js/src/transport.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index ab543821..06509cb1 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -422,6 +422,7 @@ class Http2Transport implements Transport { 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' ); this.keepaliveTimer = setTimeout(() => { + this.keepaliveTimer = null; this.keepaliveTrace('Ping timeout passed without response'); this.handleDisconnect(); }, this.keepaliveTimeoutMs); @@ -471,6 +472,7 @@ class Http2Transport implements Transport { 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' ); this.keepaliveTimer = setTimeout(() => { + this.keepaliveTimer = null; this.maybeSendPing(); }, this.keepaliveTimeMs); this.keepaliveTimer.unref?.(); From 7ecaa2d2dcaaa49467d41143169212caf55a40cd Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 7 Jun 2024 10:52:50 -0700 Subject: [PATCH 15/25] grpc-js: Bump to 1.10.9 --- packages/grpc-js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index f8b07035..73b63bf7 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.10.8", + "version": "1.10.9", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", From e7590295327b1bbd6dae812cf45f4c5c5b181985 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Fri, 10 May 2024 16:17:06 -0400 Subject: [PATCH 16/25] HTTP CONNECT: handle early server packets --- packages/grpc-js/src/http_proxy.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/grpc-js/src/http_proxy.ts b/packages/grpc-js/src/http_proxy.ts index 3e905c48..30eeb621 100644 --- a/packages/grpc-js/src/http_proxy.ts +++ b/packages/grpc-js/src/http_proxy.ts @@ -233,6 +233,12 @@ export function getProxiedConnection( ' through proxy ' + proxyAddressString ); + // The HTTP client may have already read a few bytes of the proxied + // connection. If that's the case, put them back into the socket. + // See https://github.com/grpc/grpc-node/issues/2744. + if (head.length > 0) { + socket.unshift(head); + } if ('secureContext' in connectionOptions) { /* The proxy is connecting to a TLS server, so upgrade this socket * connection to a TLS connection. From 5ae551445452ccf1db1d4ed8f3890f6b0dfc0c35 Mon Sep 17 00:00:00 2001 From: Brendan Myers Date: Wed, 29 May 2024 19:15:30 +1000 Subject: [PATCH 17/25] fix: add decoding for url encoded user credentials --- packages/grpc-js/src/http_proxy.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/src/http_proxy.ts b/packages/grpc-js/src/http_proxy.ts index 30eeb621..6fabf502 100644 --- a/packages/grpc-js/src/http_proxy.ts +++ b/packages/grpc-js/src/http_proxy.ts @@ -80,7 +80,7 @@ function getProxyInfo(): ProxyInfo { if (proxyUrl.username) { if (proxyUrl.password) { log(LogVerbosity.INFO, 'userinfo found in proxy URI'); - userCred = `${proxyUrl.username}:${proxyUrl.password}`; + userCred = decodeURIComponent(`${proxyUrl.username}:${proxyUrl.password}`); } else { userCred = proxyUrl.username; } From cbab4e51cdecbc1cff1d27c46d29ae915c4554fb Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 18 Jun 2024 15:42:59 -0700 Subject: [PATCH 18/25] grpc-js: Bump to 1.10.10 --- packages/grpc-js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 73b63bf7..774a0eaf 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.10.9", + "version": "1.10.10", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", From 42844cffd2d5f27eb7dbac7b5da1dd05b3de5a95 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 20 Jun 2024 13:15:12 -0700 Subject: [PATCH 19/25] grpc-js: Re-add client-side max send message size checking --- packages/grpc-js/src/compression-filter.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/src/compression-filter.ts b/packages/grpc-js/src/compression-filter.ts index f1600b36..189749f0 100644 --- a/packages/grpc-js/src/compression-filter.ts +++ b/packages/grpc-js/src/compression-filter.ts @@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface'; import { Channel } from './channel'; import { ChannelOptions } from './channel-options'; import { CompressionAlgorithms } from './compression-algorithms'; -import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, LogVerbosity, Status } from './constants'; +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, DEFAULT_MAX_SEND_MESSAGE_LENGTH, LogVerbosity, Status } from './constants'; import { BaseFilter, Filter, FilterFactory } from './filter'; import * as logging from './logging'; import { Metadata, MetadataValue } from './metadata'; @@ -219,6 +219,7 @@ export class CompressionFilter extends BaseFilter implements Filter { private receiveCompression: CompressionHandler = new IdentityHandler(); private currentCompressionAlgorithm: CompressionAlgorithm = 'identity'; private maxReceiveMessageLength: number; + private maxSendMessageLength: number; constructor( channelOptions: ChannelOptions, @@ -228,7 +229,8 @@ export class CompressionFilter extends BaseFilter implements Filter { const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm']; - this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH + this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH; + this.maxSendMessageLength = channelOptions['grpc.max_send_message_length'] ?? DEFAULT_MAX_SEND_MESSAGE_LENGTH; if (compressionAlgorithmKey !== undefined) { if (isCompressionAlgorithmKey(compressionAlgorithmKey)) { const clientSelectedEncoding = CompressionAlgorithms[ @@ -314,6 +316,12 @@ export class CompressionFilter extends BaseFilter implements Filter { * and the output is a framed and possibly compressed message. For this * reason, this filter should be at the bottom of the filter stack */ const resolvedMessage: WriteObject = await message; + if (this.maxSendMessageLength !== -1 && resolvedMessage.message.length > this.maxSendMessageLength) { + throw { + code: Status.RESOURCE_EXHAUSTED, + details: `Attempted to send message with a size larger than ${this.maxSendMessageLength}` + }; + } let compress: boolean; if (this.sendCompression instanceof IdentityHandler) { compress = false; From c1815e09e2ae44bfa00d1664433a2c9c80fed179 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 3 Jul 2024 15:22:03 -0700 Subject: [PATCH 20/25] grpc-js: Fix pick_first reconnecting without active calls --- packages/grpc-js/src/load-balancer-pick-first.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index f6c43b33..e0885333 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -32,7 +32,7 @@ import { PickResultType, UnavailablePicker, } from './picker'; -import { Endpoint, SubchannelAddress } from './subchannel-address'; +import { Endpoint, SubchannelAddress, subchannelAddressToString } from './subchannel-address'; import * as logging from './logging'; import { LogVerbosity } from './constants'; import { @@ -348,7 +348,6 @@ export class PickFirstLoadBalancer implements LoadBalancer { if (newState !== ConnectivityState.READY) { this.removeCurrentPick(); this.calculateAndReportNewState(); - this.requestReresolution(); } return; } @@ -483,6 +482,13 @@ export class PickFirstLoadBalancer implements LoadBalancer { subchannel: this.channelControlHelper.createSubchannel(address, {}), hasReportedTransientFailure: false, })); + trace('connectToAddressList([' + addressList.map(address => subchannelAddressToString(address)) + '])'); + for (const { subchannel } of newChildrenList) { + if (subchannel.getConnectivityState() === ConnectivityState.READY) { + this.pickSubchannel(subchannel); + return; + } + } /* Ref each subchannel before resetting the list, to ensure that * subchannels shared between the list don't drop to 0 refs during the * transition. */ @@ -527,6 +533,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { const rawAddressList = ([] as SubchannelAddress[]).concat( ...endpointList.map(endpoint => endpoint.addresses) ); + trace('updateAddressList([' + rawAddressList.map(address => subchannelAddressToString(address)) + '])'); if (rawAddressList.length === 0) { throw new Error('No addresses in endpoint list passed to pick_first'); } From e804ad65b652c90ba0c9223892ac48827879c97d Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 3 Jul 2024 15:37:40 -0700 Subject: [PATCH 21/25] grpc-js: Bump to 1.10.11 --- packages/grpc-js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 774a0eaf..23af2f98 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.10.10", + "version": "1.10.11", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", From a5fac6f0565e147cdc1cb3376e9509ef4d503e2e Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 8 Jul 2024 15:08:30 -0700 Subject: [PATCH 22/25] grpc-js: pick-first: Fix short circuit READY subchannel handling --- packages/grpc-js/src/load-balancer-pick-first.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index e0885333..e042e116 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -485,6 +485,8 @@ export class PickFirstLoadBalancer implements LoadBalancer { trace('connectToAddressList([' + addressList.map(address => subchannelAddressToString(address)) + '])'); for (const { subchannel } of newChildrenList) { if (subchannel.getConnectivityState() === ConnectivityState.READY) { + this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); + subchannel.addConnectivityStateListener(this.subchannelStateListener); this.pickSubchannel(subchannel); return; } @@ -500,10 +502,6 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.children = newChildrenList; for (const { subchannel } of this.children) { subchannel.addConnectivityStateListener(this.subchannelStateListener); - if (subchannel.getConnectivityState() === ConnectivityState.READY) { - this.pickSubchannel(subchannel); - return; - } } for (const child of this.children) { if ( From 745a451e4c1d2d8583e92cbc86cc9e5eee0b3c95 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 9 Jul 2024 11:03:15 -0700 Subject: [PATCH 23/25] grpc-js: Increase state change deadline in server idle tests --- packages/grpc-js/test/test-idle-timer.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/grpc-js/test/test-idle-timer.ts b/packages/grpc-js/test/test-idle-timer.ts index 3f2a8ed2..ed6af2cf 100644 --- a/packages/grpc-js/test/test-idle-timer.ts +++ b/packages/grpc-js/test/test-idle-timer.ts @@ -199,7 +199,7 @@ describe('Server idle timer', () => { grpc.connectivityState.READY ); client?.waitForClientState( - Date.now() + 600, + Date.now() + 1500, grpc.connectivityState.IDLE, done ); @@ -217,7 +217,7 @@ describe('Server idle timer', () => { ); client!.waitForClientState( - Date.now() + 600, + Date.now() + 1500, grpc.connectivityState.IDLE, err => { if (err) return done(err); @@ -248,7 +248,7 @@ describe('Server idle timer', () => { ); client!.waitForClientState( - Date.now() + 600, + Date.now() + 1500, grpc.connectivityState.IDLE, done ); From 395de4b333840fa28c7b54e726a017fee5c89c0d Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 9 Jul 2024 11:23:19 -0700 Subject: [PATCH 24/25] grpc-js: Refresh server idle timer if not enough time has passed --- packages/grpc-js/src/server.ts | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 33db4100..4683d44a 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1790,19 +1790,22 @@ export class Server { // for future refreshes if ( sessionInfo !== undefined && - sessionInfo.activeStreams === 0 && - Date.now() - sessionInfo.lastIdle >= ctx.sessionIdleTimeout + sessionInfo.activeStreams === 0 ) { - ctx.trace( - 'Session idle timeout triggered for ' + - socket?.remoteAddress + - ':' + - socket?.remotePort + - ' last idle at ' + - sessionInfo.lastIdle - ); + if (Date.now() - sessionInfo.lastIdle >= ctx.sessionIdleTimeout) { + ctx.trace( + 'Session idle timeout triggered for ' + + socket?.remoteAddress + + ':' + + socket?.remotePort + + ' last idle at ' + + sessionInfo.lastIdle + ); - ctx.closeSession(session); + ctx.closeSession(session); + } else { + sessionInfo.timeout.refresh(); + } } } From 810e9e6a40f586edb33f1fc4017495a000875839 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 9 Jul 2024 15:14:44 -0700 Subject: [PATCH 25/25] grpc-js: Ensure pending calls end after channel close --- packages/grpc-js/src/internal-channel.ts | 31 ++++++++++++++++++++++-- packages/grpc-js/test/test-client.ts | 30 +++++++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index e0cebd46..857f2a4e 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options'; import { ResolvingLoadBalancer } from './resolving-load-balancer'; import { SubchannelPool, getSubchannelPool } from './subchannel-pool'; import { ChannelControlHelper } from './load-balancer'; -import { UnavailablePicker, Picker, QueuePicker } from './picker'; +import { UnavailablePicker, Picker, QueuePicker, PickArgs, PickResult, PickResultType } from './picker'; import { Metadata } from './metadata'; import { Status, LogVerbosity, Propagate } from './constants'; import { FilterStackFactory } from './filter-stack'; @@ -143,6 +143,22 @@ class ChannelSubchannelWrapper } } +class ShutdownPicker implements Picker { + pick(pickArgs: PickArgs): PickResult { + return { + pickResultType: PickResultType.DROP, + status: { + code: Status.UNAVAILABLE, + details: 'Channel closed before call started', + metadata: new Metadata() + }, + subchannel: null, + onCallStarted: null, + onCallEnded: null + } + } +} + export class InternalChannel { private readonly resolvingLoadBalancer: ResolvingLoadBalancer; private readonly subchannelPool: SubchannelPool; @@ -536,7 +552,9 @@ export class InternalChannel { } getConfig(method: string, metadata: Metadata): GetConfigResult { - this.resolvingLoadBalancer.exitIdle(); + if (this.connectivityState !== ConnectivityState.SHUTDOWN) { + this.resolvingLoadBalancer.exitIdle(); + } if (this.configSelector) { return { type: 'SUCCESS', @@ -745,6 +763,15 @@ export class InternalChannel { close() { this.resolvingLoadBalancer.destroy(); this.updateState(ConnectivityState.SHUTDOWN); + this.currentPicker = new ShutdownPicker(); + for (const call of this.configSelectionQueue) { + call.cancelWithStatus(Status.UNAVAILABLE, 'Channel closed before call started'); + } + this.configSelectionQueue = []; + for (const call of this.pickQueue) { + call.cancelWithStatus(Status.UNAVAILABLE, 'Channel closed before call started'); + } + this.pickQueue = []; clearInterval(this.callRefTimer); if (this.idleTimer) { clearTimeout(this.idleTimer); diff --git a/packages/grpc-js/test/test-client.ts b/packages/grpc-js/test/test-client.ts index 67b39601..bbb3f063 100644 --- a/packages/grpc-js/test/test-client.ts +++ b/packages/grpc-js/test/test-client.ts @@ -97,6 +97,21 @@ describe('Client without a server', () => { } ); }); + it('close should force calls to end', done => { + client.makeUnaryRequest( + '/service/method', + x => x, + x => x, + Buffer.from([]), + new grpc.Metadata({waitForReady: true}), + (error, value) => { + assert(error); + assert.strictEqual(error?.code, grpc.status.UNAVAILABLE); + done(); + } + ); + client.close(); + }); }); describe('Client with a nonexistent target domain', () => { @@ -133,4 +148,19 @@ describe('Client with a nonexistent target domain', () => { } ); }); + it('close should force calls to end', done => { + client.makeUnaryRequest( + '/service/method', + x => x, + x => x, + Buffer.from([]), + new grpc.Metadata({waitForReady: true}), + (error, value) => { + assert(error); + assert.strictEqual(error?.code, grpc.status.UNAVAILABLE); + done(); + } + ); + client.close(); + }); });