diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index aed65bbb..ae3ae29c 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -435,6 +435,14 @@ export class Server { ); } + private keepaliveTrace(text: string): void { + logging.trace( + LogVerbosity.DEBUG, + 'keepalive', + '(' + this.channelzRef.id + ') ' + text + ); + } + addProtoService(): never { throw new Error('Not implemented. Use addService() instead'); } @@ -1376,7 +1384,8 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; - let keepaliveInterval: NodeJS.Timeout | null = null; + let keepaliveTimeout: NodeJS.Timeout | null = null; + let keepaliveDisabled = false; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1419,72 +1428,73 @@ export class Server { connectionAgeTimer.unref?.(); } - if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { - keepaliveInterval = setInterval(() => { - const keepaliveTimeout = setTimeout(() => { - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - sessionClosedByServer = true; - this.trace('Connection dropped by keepalive timeout'); - session.close(); - } - }, this.keepaliveTimeoutMs); - keepaliveTimeout.unref?.(); - - try { - if ( - !session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - clearTimeout(keepaliveTimeout); - if (err) { - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - } - sessionClosedByServer = true; - this.trace( - 'Connection dropped due to error with ping frame ' + - err.message + - ' return in ' + - duration - ); - session.close(); - } - } - ) - ) { - throw new Error('Server keepalive ping send failed'); - } - } catch (e) { - // The ping can't be sent because the session is already closed, max outstanding pings reached, etc - clearTimeout(keepaliveTimeout); - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - } - this.trace( - 'Connection dropped due to error sending ping frame ' + - (e instanceof Error ? e.message : 'unknown error') - ); - session.destroy(); - } - }, this.keepaliveTimeMs); - keepaliveInterval.unref?.(); - } - - session.once('goaway', (errorCode, lastStreamID, opaqueData) => { - if (errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM) { - this.trace('Connection dropped by client due to ENHANCE_YOUR_CALM'); - } else { - this.trace( - 'Connection dropped by client via GOAWAY with error code ' + - errorCode - ); + const clearKeepaliveTimeout = () => { + if (keepaliveTimeout) { + clearTimeout(keepaliveTimeout); + keepaliveTimeout = null; } - sessionClosedByServer = true; - session.destroy(); - }); + }; + + const canSendPing = () => { + return ( + !keepaliveDisabled && + this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS && + this.keepaliveTimeMs > 0 + ); + }; + + const maybeStartKeepalivePingTimer = () => { + if (!canSendPing()) { + return; + } + this.keepaliveTrace( + 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' + ); + keepaliveTimeout = setTimeout(() => { + clearKeepaliveTimeout(); + sendPing(); + }, this.keepaliveTimeMs); + keepaliveTimeout.unref?.(); + }; + + const sendPing = () => { + if (!canSendPing()) { + return; + } + this.keepaliveTrace( + 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' + ); + const pingSentSuccessfully = session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearKeepaliveTimeout(); + if (err) { + this.keepaliveTrace('Ping failed with error: ' + err.message); + sessionClosedByServer = true; + session.close(); + } else { + this.keepaliveTrace('Received ping response'); + maybeStartKeepalivePingTimer(); + } + } + ); + + if (!pingSentSuccessfully) { + this.keepaliveTrace('Ping failed to send'); + sessionClosedByServer = true; + session.close(); + return; + } + + keepaliveTimeout = setTimeout(() => { + clearKeepaliveTimeout(); + this.keepaliveTrace('Ping timeout passed without response'); + sessionClosedByServer = true; + session.close(); + }, this.keepaliveTimeoutMs); + keepaliveTimeout.unref?.(); + }; + + maybeStartKeepalivePingTimer(); session.on('close', () => { if (!sessionClosedByServer) { @@ -1501,10 +1511,8 @@ export class Server { clearTimeout(connectionAgeGraceTimer); } - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - } + keepaliveDisabled = true; + clearKeepaliveTimeout(); if (idleTimeoutObj !== null) { clearTimeout(idleTimeoutObj.timeout); @@ -1549,7 +1557,8 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; - let keepaliveInterval: NodeJS.Timeout | null = null; + let keepaliveTimeout: NodeJS.Timeout | null = null; + let keepaliveDisabled = false; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1591,85 +1600,90 @@ export class Server { connectionAgeTimer.unref?.(); } - if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { - keepaliveInterval = setInterval(() => { - const keepaliveTimeout = setTimeout(() => { - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - sessionClosedByServer = true; + const clearKeepaliveTimeout = () => { + if (keepaliveTimeout) { + clearTimeout(keepaliveTimeout); + keepaliveTimeout = null; + } + }; + + const canSendPing = () => { + return ( + !keepaliveDisabled && + this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS && + this.keepaliveTimeMs > 0 + ); + }; + + const maybeStartKeepalivePingTimer = () => { + if (!canSendPing()) { + return; + } + this.keepaliveTrace( + 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' + ); + keepaliveTimeout = setTimeout(() => { + clearKeepaliveTimeout(); + sendPing(); + }, this.keepaliveTimeMs); + keepaliveTimeout.unref?.(); + }; + + const sendPing = () => { + if (!canSendPing()) { + return; + } + this.keepaliveTrace( + 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' + ); + const pingSentSuccessfully = session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearKeepaliveTimeout(); + if (err) { + this.keepaliveTrace('Ping failed with error: ' + err.message); this.channelzTrace.addTrace( 'CT_INFO', - 'Connection dropped by keepalive timeout from ' + clientAddress + 'Connection dropped due to error of a ping frame ' + + err.message + + ' return in ' + + duration ); + sessionClosedByServer = true; session.close(); + } else { + this.keepaliveTrace('Received ping response'); + maybeStartKeepalivePingTimer(); } - }, this.keepaliveTimeoutMs); - keepaliveTimeout.unref?.(); - - try { - if ( - !session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - clearTimeout(keepaliveTimeout); - if (err) { - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - } - sessionClosedByServer = true; - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped due to error with ping frame ' + - err.message + - ' return in ' + - duration - ); - session.close(); - } - } - ) - ) { - throw new Error('Server keepalive ping send failed'); - } - channelzSessionInfo.keepAlivesSent += 1; - } catch (e) { - // The ping can't be sent because the session is already closed, max outstanding pings reached, etc - clearTimeout(keepaliveTimeout); - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - } - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped due to error sending ping frame ' + - (e instanceof Error ? e.message : 'unknown error') - ); - session.destroy(); } - }, this.keepaliveTimeMs); - keepaliveInterval.unref?.(); - } + ); - session.once('goaway', (errorCode, lastStreamID, opaqueData) => { - if (errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM) { + if (!pingSentSuccessfully) { + this.keepaliveTrace('Ping failed to send'); this.channelzTrace.addTrace( 'CT_INFO', - 'Connection dropped by client due GOAWAY of ENHANCE_YOUR_CALM from ' + - clientAddress - ); - } else { - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped by client via GOAWAY with error code ' + - errorCode + - ' from ' + - clientAddress + 'Connection dropped due failure to send ping frame' ); + sessionClosedByServer = true; + session.close(); + return; } - sessionClosedByServer = true; - session.destroy(); - }); + + channelzSessionInfo.keepAlivesSent += 1; + + keepaliveTimeout = setTimeout(() => { + clearKeepaliveTimeout(); + this.keepaliveTrace('Ping timeout passed without response'); + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by keepalive timeout from ' + clientAddress + ); + sessionClosedByServer = true; + session.close(); + }, this.keepaliveTimeoutMs); + keepaliveTimeout.unref?.(); + }; + + maybeStartKeepalivePingTimer(); session.on('close', () => { if (!sessionClosedByServer) { @@ -1690,10 +1704,8 @@ export class Server { clearTimeout(connectionAgeGraceTimer); } - if (keepaliveInterval) { - clearInterval(keepaliveInterval); - keepaliveInterval = null; - } + keepaliveDisabled = true; + clearKeepaliveTimeout(); if (idleTimeoutObj !== null) { clearTimeout(idleTimeoutObj.timeout); diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index 71d0f26b..b56cd5b8 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -101,28 +101,29 @@ class Http2Transport implements Transport { /** * The amount of time in between sending pings */ - private keepaliveTimeMs = -1; + private readonly keepaliveTimeMs: number; /** * The amount of time to wait for an acknowledgement after sending a ping */ - private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS; + private readonly keepaliveTimeoutMs: number; /** - * Timer reference for timeout that indicates when to send the next ping + * Indicates whether keepalive pings should be sent without any active calls */ - private keepaliveTimerId: NodeJS.Timeout | null = null; + private readonly keepaliveWithoutCalls: boolean; + /** + * Timer reference indicating when to send the next ping or when the most recent ping will be considered lost. + */ + private keepaliveTimeout: NodeJS.Timeout | null = null; /** * Indicates that the keepalive timer ran out while there were no active * calls, and a ping should be sent the next time a call starts. */ private pendingSendKeepalivePing = false; /** - * Timer reference tracking when the most recent ping will be considered lost + * Indicates when keepalives should no longer be performed for this transport. Used to prevent a race where a + * latent session.ping(..) callback is called after the transport has been notified to disconnect. */ - private keepaliveTimeoutId: NodeJS.Timeout | null = null; - /** - * Indicates whether keepalive pings should be sent without any active calls - */ - private keepaliveWithoutCalls = false; + private keepaliveDisabled = false; private userAgent: string; @@ -182,9 +183,13 @@ class Http2Transport implements Transport { if ('grpc.keepalive_time_ms' in options) { this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!; + } else { + this.keepaliveTimeMs = -1; } if ('grpc.keepalive_timeout_ms' in options) { this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!; + } else { + this.keepaliveTimeoutMs = KEEPALIVE_TIMEOUT_MS; } if ('grpc.keepalive_permit_without_calls' in options) { this.keepaliveWithoutCalls = @@ -195,7 +200,6 @@ class Http2Transport implements Transport { session.once('close', () => { this.trace('session closed'); - this.stopKeepalivePings(); this.handleDisconnect(); }); @@ -383,6 +387,8 @@ class Http2Transport implements Transport { * Handle connection drops, but not GOAWAYs. */ private handleDisconnect() { + this.keepaliveDisabled = true; + this.clearKeepaliveTimeout(); this.reportDisconnectToOwner(false); /* Give calls an event loop cycle to finish naturally before reporting the * disconnnection to them. */ @@ -397,63 +403,48 @@ class Http2Transport implements Transport { this.disconnectListeners.push(listener); } - private clearKeepaliveTimer() { - if (!this.keepaliveTimerId) { - return; - } - clearTimeout(this.keepaliveTimerId); - this.keepaliveTimerId = null; - } - - private clearKeepaliveTimeout() { - if (!this.keepaliveTimeoutId) { - return; - } - clearTimeout(this.keepaliveTimeoutId); - this.keepaliveTimeoutId = null; - } - private canSendPing() { return ( + !this.keepaliveDisabled && this.keepaliveTimeMs > 0 && (this.keepaliveWithoutCalls || this.activeCalls.size > 0) ); } private maybeSendPing() { - this.clearKeepaliveTimer(); if (!this.canSendPing()) { this.pendingSendKeepalivePing = true; return; } + if (this.keepaliveTimeout) { + console.error('keepaliveTimeout is not null'); + return; + } if (this.channelzEnabled) { this.keepalivesSent += 1; } this.keepaliveTrace( 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' ); - if (!this.keepaliveTimeoutId) { - this.keepaliveTimeoutId = setTimeout(() => { - this.keepaliveTrace('Ping timeout passed without response'); - this.handleDisconnect(); - }, this.keepaliveTimeoutMs); - this.keepaliveTimeoutId.unref?.(); - } - try { - this.session!.ping( - (err: Error | null, duration: number, payload: Buffer) => { - if (err) { - this.keepaliveTrace('Ping failed with error ' + err.message); - this.handleDisconnect(); - } + this.keepaliveTimeout = setTimeout(() => { + this.keepaliveTrace('Ping timeout passed without response'); + this.handleDisconnect(); + }, this.keepaliveTimeoutMs); + this.keepaliveTimeout.unref?.(); + const pingSentSuccessfully = this.session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + this.clearKeepaliveTimeout(); + if (err) { + this.keepaliveTrace('Ping failed with error ' + err.message); + this.handleDisconnect(); + } else { this.keepaliveTrace('Received ping response'); - this.clearKeepaliveTimeout(); this.maybeStartKeepalivePingTimer(); } - ); - } catch (e) { - /* If we fail to send a ping, the connection is no longer functional, so - * we should discard it. */ + } + ); + if (!pingSentSuccessfully) { + this.keepaliveTrace('Ping failed to send'); this.handleDisconnect(); } } @@ -471,25 +462,27 @@ class Http2Transport implements Transport { if (this.pendingSendKeepalivePing) { this.pendingSendKeepalivePing = false; this.maybeSendPing(); - } else if (!this.keepaliveTimerId && !this.keepaliveTimeoutId) { + } else if (!this.keepaliveTimeout) { this.keepaliveTrace( 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' ); - this.keepaliveTimerId = setTimeout(() => { + this.keepaliveTimeout = setTimeout(() => { this.maybeSendPing(); }, this.keepaliveTimeMs); - this.keepaliveTimerId.unref?.(); + this.keepaliveTimeout.unref?.(); } /* Otherwise, there is already either a keepalive timer or a ping pending, * wait for those to resolve. */ } - private stopKeepalivePings() { - if (this.keepaliveTimerId) { - clearTimeout(this.keepaliveTimerId); - this.keepaliveTimerId = null; + /** + * Clears whichever keepalive timeout is currently active, if any. + */ + private clearKeepaliveTimeout() { + if (this.keepaliveTimeout) { + clearTimeout(this.keepaliveTimeout); + this.keepaliveTimeout = null; } - this.clearKeepaliveTimeout(); } private removeActiveCall(call: Http2SubchannelCall) { @@ -533,7 +526,7 @@ class Http2Transport implements Transport { * error here. */ try { - http2Stream = this.session!.request(headers); + http2Stream = this.session.request(headers); } catch (e) { this.handleDisconnect(); throw e;