Merge pull request #2760 from davidfiala/@grpc/grpc-js@1.10.x

Keepalive bugfixes and unify timers strategies between client and server
This commit is contained in:
Michael Lumish 2024-06-18 15:27:11 -07:00 committed by GitHub
commit 5c0226d0db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 237 additions and 139 deletions

View File

@ -435,6 +435,14 @@ export class Server {
); );
} }
private keepaliveTrace(text: string): void {
logging.trace(
LogVerbosity.DEBUG,
'keepalive',
'(' + this.channelzRef.id + ') ' + text
);
}
addProtoService(): never { addProtoService(): never {
throw new Error('Not implemented. Use addService() instead'); throw new Error('Not implemented. Use addService() instead');
} }
@ -1376,8 +1384,7 @@ export class Server {
let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeTimer: NodeJS.Timeout | null = null;
let connectionAgeGraceTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
let keeapliveTimeTimer: NodeJS.Timeout | null = null; let keepaliveTimer: NodeJS.Timeout | null = null;
let keepaliveTimeoutTimer: NodeJS.Timeout | null = null;
let sessionClosedByServer = false; let sessionClosedByServer = false;
const idleTimeoutObj = this.enableIdleTimeout(session); const idleTimeoutObj = this.enableIdleTimeout(session);
@ -1420,41 +1427,90 @@ export class Server {
connectionAgeTimer.unref?.(); connectionAgeTimer.unref?.();
} }
if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { const clearKeepaliveTimeout = () => {
keeapliveTimeTimer = setInterval(() => { if (keepaliveTimer) {
keepaliveTimeoutTimer = setTimeout(() => { clearTimeout(keepaliveTimer);
sessionClosedByServer = true; keepaliveTimer = null;
session.close(); }
}, this.keepaliveTimeoutMs); };
keepaliveTimeoutTimer.unref?.();
try { const canSendPing = () => {
session.ping( return (
(err: Error | null, duration: number, payload: Buffer) => { !session.destroyed &&
if (keepaliveTimeoutTimer) { this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS &&
clearTimeout(keepaliveTimeoutTimer); this.keepaliveTimeMs > 0
} );
};
if (err) { /* eslint-disable-next-line prefer-const */
sessionClosedByServer = true; let sendPing: () => void; // hoisted for use in maybeStartKeepalivePingTimer
this.trace(
'Connection dropped due to error of a ping frame ' + const maybeStartKeepalivePingTimer = () => {
err.message + if (!canSendPing()) {
' return in ' + return;
duration }
); this.keepaliveTrace(
session.close(); 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms'
} );
} keepaliveTimer = setTimeout(() => {
); clearKeepaliveTimeout();
} catch (e) { sendPing();
clearTimeout(keepaliveTimeoutTimer);
// The ping can't be sent because the session is already closed
session.destroy();
}
}, this.keepaliveTimeMs); }, this.keepaliveTimeMs);
keeapliveTimeTimer.unref?.(); keepaliveTimer.unref?.();
} };
sendPing = () => {
if (!canSendPing()) {
return;
}
this.keepaliveTrace(
'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'
);
let pingSendError = '';
try {
const pingSentSuccessfully = session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearKeepaliveTimeout();
if (err) {
this.keepaliveTrace('Ping failed with error: ' + err.message);
sessionClosedByServer = true;
session.close();
} else {
this.keepaliveTrace('Received ping response');
maybeStartKeepalivePingTimer();
}
}
);
if (!pingSentSuccessfully) {
pingSendError = 'Ping returned false';
}
} catch (e) {
// grpc/grpc-node#2139
pingSendError =
(e instanceof Error ? e.message : '') || 'Unknown error';
}
if (pingSendError) {
this.keepaliveTrace('Ping send failed: ' + pingSendError);
this.trace(
'Connection dropped due to ping send error: ' + pingSendError
);
sessionClosedByServer = true;
session.close();
return;
}
keepaliveTimer = setTimeout(() => {
clearKeepaliveTimeout();
this.keepaliveTrace('Ping timeout passed without response');
this.trace('Connection dropped by keepalive timeout');
sessionClosedByServer = true;
session.close();
}, this.keepaliveTimeoutMs);
keepaliveTimer.unref?.();
};
maybeStartKeepalivePingTimer();
session.on('close', () => { session.on('close', () => {
if (!sessionClosedByServer) { if (!sessionClosedByServer) {
@ -1471,12 +1527,7 @@ export class Server {
clearTimeout(connectionAgeGraceTimer); clearTimeout(connectionAgeGraceTimer);
} }
if (keeapliveTimeTimer) { clearKeepaliveTimeout();
clearInterval(keeapliveTimeTimer);
if (keepaliveTimeoutTimer) {
clearTimeout(keepaliveTimeoutTimer);
}
}
if (idleTimeoutObj !== null) { if (idleTimeoutObj !== null) {
clearTimeout(idleTimeoutObj.timeout); clearTimeout(idleTimeoutObj.timeout);
@ -1521,8 +1572,7 @@ export class Server {
let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeTimer: NodeJS.Timeout | null = null;
let connectionAgeGraceTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
let keeapliveTimeTimer: NodeJS.Timeout | null = null; let keepaliveTimeout: NodeJS.Timeout | null = null;
let keepaliveTimeoutTimer: NodeJS.Timeout | null = null;
let sessionClosedByServer = false; let sessionClosedByServer = false;
const idleTimeoutObj = this.enableIdleTimeout(session); const idleTimeoutObj = this.enableIdleTimeout(session);
@ -1564,49 +1614,103 @@ export class Server {
connectionAgeTimer.unref?.(); connectionAgeTimer.unref?.();
} }
if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { const clearKeepaliveTimeout = () => {
keeapliveTimeTimer = setInterval(() => { if (keepaliveTimeout) {
keepaliveTimeoutTimer = setTimeout(() => { clearTimeout(keepaliveTimeout);
sessionClosedByServer = true; keepaliveTimeout = null;
this.channelzTrace.addTrace( }
'CT_INFO', };
'Connection dropped by keepalive timeout from ' + clientAddress
);
session.close(); const canSendPing = () => {
}, this.keepaliveTimeoutMs); return (
keepaliveTimeoutTimer.unref?.(); !session.destroyed &&
this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS &&
this.keepaliveTimeMs > 0
);
};
try { /* eslint-disable-next-line prefer-const */
session.ping( let sendPing: () => void; // hoisted for use in maybeStartKeepalivePingTimer
(err: Error | null, duration: number, payload: Buffer) => {
if (keepaliveTimeoutTimer) {
clearTimeout(keepaliveTimeoutTimer);
}
if (err) { const maybeStartKeepalivePingTimer = () => {
sessionClosedByServer = true; if (!canSendPing()) {
this.channelzTrace.addTrace( return;
'CT_INFO', }
'Connection dropped due to error of a ping frame ' + this.keepaliveTrace(
err.message + 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms'
' return in ' + );
duration keepaliveTimeout = setTimeout(() => {
); clearKeepaliveTimeout();
sendPing();
session.close();
}
}
);
channelzSessionInfo.keepAlivesSent += 1;
} catch (e) {
clearTimeout(keepaliveTimeoutTimer);
// The ping can't be sent because the session is already closed
session.destroy();
}
}, this.keepaliveTimeMs); }, this.keepaliveTimeMs);
keeapliveTimeTimer.unref?.(); keepaliveTimeout.unref?.();
} };
sendPing = () => {
if (!canSendPing()) {
return;
}
this.keepaliveTrace(
'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'
);
let pingSendError = '';
try {
const pingSentSuccessfully = session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearKeepaliveTimeout();
if (err) {
this.keepaliveTrace('Ping failed with error: ' + err.message);
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped due to error of a ping frame ' +
err.message +
' return in ' +
duration
);
sessionClosedByServer = true;
session.close();
} else {
this.keepaliveTrace('Received ping response');
maybeStartKeepalivePingTimer();
}
}
);
if (!pingSentSuccessfully) {
pingSendError = 'Ping returned false';
}
} catch (e) {
// grpc/grpc-node#2139
pingSendError =
(e instanceof Error ? e.message : '') || 'Unknown error';
}
if (pingSendError) {
this.keepaliveTrace('Ping send failed: ' + pingSendError);
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped due to ping send error: ' + pingSendError
);
sessionClosedByServer = true;
session.close();
return;
}
channelzSessionInfo.keepAlivesSent += 1;
keepaliveTimeout = setTimeout(() => {
clearKeepaliveTimeout();
this.keepaliveTrace('Ping timeout passed without response');
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by keepalive timeout from ' + clientAddress
);
sessionClosedByServer = true;
session.close();
}, this.keepaliveTimeoutMs);
keepaliveTimeout.unref?.();
};
maybeStartKeepalivePingTimer();
session.on('close', () => { session.on('close', () => {
if (!sessionClosedByServer) { if (!sessionClosedByServer) {
@ -1627,12 +1731,7 @@ export class Server {
clearTimeout(connectionAgeGraceTimer); clearTimeout(connectionAgeGraceTimer);
} }
if (keeapliveTimeTimer) { clearKeepaliveTimeout();
clearInterval(keeapliveTimeTimer);
if (keepaliveTimeoutTimer) {
clearTimeout(keepaliveTimeoutTimer);
}
}
if (idleTimeoutObj !== null) { if (idleTimeoutObj !== null) {
clearTimeout(idleTimeoutObj.timeout); clearTimeout(idleTimeoutObj.timeout);

View File

@ -102,28 +102,24 @@ class Http2Transport implements Transport {
/** /**
* The amount of time in between sending pings * The amount of time in between sending pings
*/ */
private keepaliveTimeMs = -1; private readonly keepaliveTimeMs: number;
/** /**
* The amount of time to wait for an acknowledgement after sending a ping * The amount of time to wait for an acknowledgement after sending a ping
*/ */
private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS; private readonly keepaliveTimeoutMs: number;
/** /**
* Timer reference for timeout that indicates when to send the next ping * Indicates whether keepalive pings should be sent without any active calls
*/ */
private keepaliveTimerId: NodeJS.Timeout | null = null; private readonly keepaliveWithoutCalls: boolean;
/**
* Timer reference indicating when to send the next ping or when the most recent ping will be considered lost.
*/
private keepaliveTimer: NodeJS.Timeout | null = null;
/** /**
* Indicates that the keepalive timer ran out while there were no active * Indicates that the keepalive timer ran out while there were no active
* calls, and a ping should be sent the next time a call starts. * calls, and a ping should be sent the next time a call starts.
*/ */
private pendingSendKeepalivePing = false; private pendingSendKeepalivePing = false;
/**
* Timer reference tracking when the most recent ping will be considered lost
*/
private keepaliveTimeoutId: NodeJS.Timeout | null = null;
/**
* Indicates whether keepalive pings should be sent without any active calls
*/
private keepaliveWithoutCalls = false;
private userAgent: string; private userAgent: string;
@ -183,9 +179,13 @@ class Http2Transport implements Transport {
if ('grpc.keepalive_time_ms' in options) { if ('grpc.keepalive_time_ms' in options) {
this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!; this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
} else {
this.keepaliveTimeMs = -1;
} }
if ('grpc.keepalive_timeout_ms' in options) { if ('grpc.keepalive_timeout_ms' in options) {
this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!; this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!;
} else {
this.keepaliveTimeoutMs = KEEPALIVE_TIMEOUT_MS;
} }
if ('grpc.keepalive_permit_without_calls' in options) { if ('grpc.keepalive_permit_without_calls' in options) {
this.keepaliveWithoutCalls = this.keepaliveWithoutCalls =
@ -196,7 +196,6 @@ class Http2Transport implements Transport {
session.once('close', () => { session.once('close', () => {
this.trace('session closed'); this.trace('session closed');
this.stopKeepalivePings();
this.handleDisconnect(); this.handleDisconnect();
}); });
@ -384,6 +383,7 @@ class Http2Transport implements Transport {
* Handle connection drops, but not GOAWAYs. * Handle connection drops, but not GOAWAYs.
*/ */
private handleDisconnect() { private handleDisconnect() {
this.clearKeepaliveTimeout();
this.reportDisconnectToOwner(false); this.reportDisconnectToOwner(false);
/* Give calls an event loop cycle to finish naturally before reporting the /* Give calls an event loop cycle to finish naturally before reporting the
* disconnnection to them. */ * disconnnection to them. */
@ -391,6 +391,7 @@ class Http2Transport implements Transport {
for (const call of this.activeCalls) { for (const call of this.activeCalls) {
call.onDisconnect(); call.onDisconnect();
} }
this.session.destroy();
}); });
} }
@ -398,63 +399,58 @@ class Http2Transport implements Transport {
this.disconnectListeners.push(listener); this.disconnectListeners.push(listener);
} }
private clearKeepaliveTimer() {
if (!this.keepaliveTimerId) {
return;
}
clearTimeout(this.keepaliveTimerId);
this.keepaliveTimerId = null;
}
private clearKeepaliveTimeout() {
if (!this.keepaliveTimeoutId) {
return;
}
clearTimeout(this.keepaliveTimeoutId);
this.keepaliveTimeoutId = null;
}
private canSendPing() { private canSendPing() {
return ( return (
!this.session.destroyed &&
this.keepaliveTimeMs > 0 && this.keepaliveTimeMs > 0 &&
(this.keepaliveWithoutCalls || this.activeCalls.size > 0) (this.keepaliveWithoutCalls || this.activeCalls.size > 0)
); );
} }
private maybeSendPing() { private maybeSendPing() {
this.clearKeepaliveTimer();
if (!this.canSendPing()) { if (!this.canSendPing()) {
this.pendingSendKeepalivePing = true; this.pendingSendKeepalivePing = true;
return; return;
} }
if (this.keepaliveTimer) {
console.error('keepaliveTimeout is not null');
return;
}
if (this.channelzEnabled) { if (this.channelzEnabled) {
this.keepalivesSent += 1; this.keepalivesSent += 1;
} }
this.keepaliveTrace( this.keepaliveTrace(
'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'
); );
if (!this.keepaliveTimeoutId) { this.keepaliveTimer = setTimeout(() => {
this.keepaliveTimeoutId = setTimeout(() => { this.keepaliveTimer = null;
this.keepaliveTrace('Ping timeout passed without response'); this.keepaliveTrace('Ping timeout passed without response');
this.handleDisconnect(); this.handleDisconnect();
}, this.keepaliveTimeoutMs); }, this.keepaliveTimeoutMs);
this.keepaliveTimeoutId.unref?.(); this.keepaliveTimer.unref?.();
} let pingSendError = '';
try { try {
this.session!.ping( const pingSentSuccessfully = this.session.ping(
(err: Error | null, duration: number, payload: Buffer) => { (err: Error | null, duration: number, payload: Buffer) => {
this.clearKeepaliveTimeout();
if (err) { if (err) {
this.keepaliveTrace('Ping failed with error ' + err.message); this.keepaliveTrace('Ping failed with error ' + err.message);
this.handleDisconnect(); this.handleDisconnect();
} else {
this.keepaliveTrace('Received ping response');
this.maybeStartKeepalivePingTimer();
} }
this.keepaliveTrace('Received ping response');
this.clearKeepaliveTimeout();
this.maybeStartKeepalivePingTimer();
} }
); );
if (!pingSentSuccessfully) {
pingSendError = 'Ping returned false';
}
} catch (e) { } catch (e) {
/* If we fail to send a ping, the connection is no longer functional, so // grpc/grpc-node#2139
* we should discard it. */ pingSendError = (e instanceof Error ? e.message : '') || 'Unknown error';
}
if (pingSendError) {
this.keepaliveTrace('Ping send failed: ' + pingSendError);
this.handleDisconnect(); this.handleDisconnect();
} }
} }
@ -472,25 +468,28 @@ class Http2Transport implements Transport {
if (this.pendingSendKeepalivePing) { if (this.pendingSendKeepalivePing) {
this.pendingSendKeepalivePing = false; this.pendingSendKeepalivePing = false;
this.maybeSendPing(); this.maybeSendPing();
} else if (!this.keepaliveTimerId && !this.keepaliveTimeoutId) { } else if (!this.keepaliveTimer) {
this.keepaliveTrace( this.keepaliveTrace(
'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms'
); );
this.keepaliveTimerId = setTimeout(() => { this.keepaliveTimer = setTimeout(() => {
this.keepaliveTimer = null;
this.maybeSendPing(); this.maybeSendPing();
}, this.keepaliveTimeMs); }, this.keepaliveTimeMs);
this.keepaliveTimerId.unref?.(); this.keepaliveTimer.unref?.();
} }
/* Otherwise, there is already either a keepalive timer or a ping pending, /* Otherwise, there is already either a keepalive timer or a ping pending,
* wait for those to resolve. */ * wait for those to resolve. */
} }
private stopKeepalivePings() { /**
if (this.keepaliveTimerId) { * Clears whichever keepalive timeout is currently active, if any.
clearTimeout(this.keepaliveTimerId); */
this.keepaliveTimerId = null; private clearKeepaliveTimeout() {
if (this.keepaliveTimer) {
clearTimeout(this.keepaliveTimer);
this.keepaliveTimer = null;
} }
this.clearKeepaliveTimeout();
} }
private removeActiveCall(call: Http2SubchannelCall) { private removeActiveCall(call: Http2SubchannelCall) {
@ -534,7 +533,7 @@ class Http2Transport implements Transport {
* error here. * error here.
*/ */
try { try {
http2Stream = this.session!.request(headers); http2Stream = this.session.request(headers);
} catch (e) { } catch (e) {
this.handleDisconnect(); this.handleDisconnect();
throw e; throw e;