unify server and client keepalive matching comments and discussion on first round of review from https://github.com/grpc/grpc-node/pull/2760

This commit is contained in:
David Fiala 2024-05-28 22:26:25 -07:00
parent 334f0dcdb5
commit d799a7a5bd
2 changed files with 204 additions and 199 deletions

View File

@ -435,6 +435,14 @@ export class Server {
);
}
private keepaliveTrace(text: string): void {
logging.trace(
LogVerbosity.DEBUG,
'keepalive',
'(' + this.channelzRef.id + ') ' + text
);
}
addProtoService(): never {
throw new Error('Not implemented. Use addService() instead');
}
@ -1376,7 +1384,8 @@ export class Server {
let connectionAgeTimer: NodeJS.Timeout | null = null;
let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
let keepaliveInterval: NodeJS.Timeout | null = null;
let keepaliveTimeout: NodeJS.Timeout | null = null;
let keepaliveDisabled = false;
let sessionClosedByServer = false;
const idleTimeoutObj = this.enableIdleTimeout(session);
@ -1419,72 +1428,73 @@ export class Server {
connectionAgeTimer.unref?.();
}
if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) {
keepaliveInterval = setInterval(() => {
const keepaliveTimeout = setTimeout(() => {
if (keepaliveInterval) {
clearInterval(keepaliveInterval);
keepaliveInterval = null;
sessionClosedByServer = true;
this.trace('Connection dropped by keepalive timeout');
session.close();
}
}, this.keepaliveTimeoutMs);
keepaliveTimeout.unref?.();
try {
if (
!session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(keepaliveTimeout);
if (err) {
if (keepaliveInterval) {
clearInterval(keepaliveInterval);
keepaliveInterval = null;
}
sessionClosedByServer = true;
this.trace(
'Connection dropped due to error with ping frame ' +
err.message +
' return in ' +
duration
);
session.close();
}
}
)
) {
throw new Error('Server keepalive ping send failed');
}
} catch (e) {
// The ping can't be sent because the session is already closed, max outstanding pings reached, etc
clearTimeout(keepaliveTimeout);
if (keepaliveInterval) {
clearInterval(keepaliveInterval);
keepaliveInterval = null;
}
this.trace(
'Connection dropped due to error sending ping frame ' +
(e instanceof Error ? e.message : 'unknown error')
);
session.destroy();
}
}, this.keepaliveTimeMs);
keepaliveInterval.unref?.();
}
session.once('goaway', (errorCode, lastStreamID, opaqueData) => {
if (errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM) {
this.trace('Connection dropped by client due to ENHANCE_YOUR_CALM');
} else {
this.trace(
'Connection dropped by client via GOAWAY with error code ' +
errorCode
);
const clearKeepaliveTimeout = () => {
if (keepaliveTimeout) {
clearTimeout(keepaliveTimeout);
keepaliveTimeout = null;
}
sessionClosedByServer = true;
session.destroy();
});
};
const canSendPing = () => {
return (
!keepaliveDisabled &&
this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS &&
this.keepaliveTimeMs > 0
);
};
const maybeStartKeepalivePingTimer = () => {
if (!canSendPing()) {
return;
}
this.keepaliveTrace(
'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms'
);
keepaliveTimeout = setTimeout(() => {
clearKeepaliveTimeout();
sendPing();
}, this.keepaliveTimeMs);
keepaliveTimeout.unref?.();
};
const sendPing = () => {
if (!canSendPing()) {
return;
}
this.keepaliveTrace(
'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'
);
const pingSentSuccessfully = session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearKeepaliveTimeout();
if (err) {
this.keepaliveTrace('Ping failed with error: ' + err.message);
sessionClosedByServer = true;
session.close();
} else {
this.keepaliveTrace('Received ping response');
maybeStartKeepalivePingTimer();
}
}
);
if (!pingSentSuccessfully) {
this.keepaliveTrace('Ping failed to send');
sessionClosedByServer = true;
session.close();
return;
}
keepaliveTimeout = setTimeout(() => {
clearKeepaliveTimeout();
this.keepaliveTrace('Ping timeout passed without response');
sessionClosedByServer = true;
session.close();
}, this.keepaliveTimeoutMs);
keepaliveTimeout.unref?.();
};
maybeStartKeepalivePingTimer();
session.on('close', () => {
if (!sessionClosedByServer) {
@ -1501,10 +1511,8 @@ export class Server {
clearTimeout(connectionAgeGraceTimer);
}
if (keepaliveInterval) {
clearInterval(keepaliveInterval);
keepaliveInterval = null;
}
keepaliveDisabled = true;
clearKeepaliveTimeout();
if (idleTimeoutObj !== null) {
clearTimeout(idleTimeoutObj.timeout);
@ -1549,7 +1557,8 @@ export class Server {
let connectionAgeTimer: NodeJS.Timeout | null = null;
let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
let keepaliveInterval: NodeJS.Timeout | null = null;
let keepaliveTimeout: NodeJS.Timeout | null = null;
let keepaliveDisabled = false;
let sessionClosedByServer = false;
const idleTimeoutObj = this.enableIdleTimeout(session);
@ -1591,85 +1600,90 @@ export class Server {
connectionAgeTimer.unref?.();
}
if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) {
keepaliveInterval = setInterval(() => {
const keepaliveTimeout = setTimeout(() => {
if (keepaliveInterval) {
clearInterval(keepaliveInterval);
keepaliveInterval = null;
sessionClosedByServer = true;
const clearKeepaliveTimeout = () => {
if (keepaliveTimeout) {
clearTimeout(keepaliveTimeout);
keepaliveTimeout = null;
}
};
const canSendPing = () => {
return (
!keepaliveDisabled &&
this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS &&
this.keepaliveTimeMs > 0
);
};
const maybeStartKeepalivePingTimer = () => {
if (!canSendPing()) {
return;
}
this.keepaliveTrace(
'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms'
);
keepaliveTimeout = setTimeout(() => {
clearKeepaliveTimeout();
sendPing();
}, this.keepaliveTimeMs);
keepaliveTimeout.unref?.();
};
const sendPing = () => {
if (!canSendPing()) {
return;
}
this.keepaliveTrace(
'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'
);
const pingSentSuccessfully = session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearKeepaliveTimeout();
if (err) {
this.keepaliveTrace('Ping failed with error: ' + err.message);
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by keepalive timeout from ' + clientAddress
'Connection dropped due to error of a ping frame ' +
err.message +
' return in ' +
duration
);
sessionClosedByServer = true;
session.close();
} else {
this.keepaliveTrace('Received ping response');
maybeStartKeepalivePingTimer();
}
}, this.keepaliveTimeoutMs);
keepaliveTimeout.unref?.();
try {
if (
!session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(keepaliveTimeout);
if (err) {
if (keepaliveInterval) {
clearInterval(keepaliveInterval);
keepaliveInterval = null;
}
sessionClosedByServer = true;
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped due to error with ping frame ' +
err.message +
' return in ' +
duration
);
session.close();
}
}
)
) {
throw new Error('Server keepalive ping send failed');
}
channelzSessionInfo.keepAlivesSent += 1;
} catch (e) {
// The ping can't be sent because the session is already closed, max outstanding pings reached, etc
clearTimeout(keepaliveTimeout);
if (keepaliveInterval) {
clearInterval(keepaliveInterval);
keepaliveInterval = null;
}
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped due to error sending ping frame ' +
(e instanceof Error ? e.message : 'unknown error')
);
session.destroy();
}
}, this.keepaliveTimeMs);
keepaliveInterval.unref?.();
}
);
session.once('goaway', (errorCode, lastStreamID, opaqueData) => {
if (errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM) {
if (!pingSentSuccessfully) {
this.keepaliveTrace('Ping failed to send');
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by client due GOAWAY of ENHANCE_YOUR_CALM from ' +
clientAddress
);
} else {
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by client via GOAWAY with error code ' +
errorCode +
' from ' +
clientAddress
'Connection dropped due failure to send ping frame'
);
sessionClosedByServer = true;
session.close();
return;
}
sessionClosedByServer = true;
session.destroy();
});
channelzSessionInfo.keepAlivesSent += 1;
keepaliveTimeout = setTimeout(() => {
clearKeepaliveTimeout();
this.keepaliveTrace('Ping timeout passed without response');
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by keepalive timeout from ' + clientAddress
);
sessionClosedByServer = true;
session.close();
}, this.keepaliveTimeoutMs);
keepaliveTimeout.unref?.();
};
maybeStartKeepalivePingTimer();
session.on('close', () => {
if (!sessionClosedByServer) {
@ -1690,10 +1704,8 @@ export class Server {
clearTimeout(connectionAgeGraceTimer);
}
if (keepaliveInterval) {
clearInterval(keepaliveInterval);
keepaliveInterval = null;
}
keepaliveDisabled = true;
clearKeepaliveTimeout();
if (idleTimeoutObj !== null) {
clearTimeout(idleTimeoutObj.timeout);

View File

@ -101,28 +101,29 @@ class Http2Transport implements Transport {
/**
* The amount of time in between sending pings
*/
private keepaliveTimeMs = -1;
private readonly keepaliveTimeMs: number;
/**
* The amount of time to wait for an acknowledgement after sending a ping
*/
private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS;
private readonly keepaliveTimeoutMs: number;
/**
* Timer reference for timeout that indicates when to send the next ping
* Indicates whether keepalive pings should be sent without any active calls
*/
private keepaliveTimerId: NodeJS.Timeout | null = null;
private readonly keepaliveWithoutCalls: boolean;
/**
* Timer reference indicating when to send the next ping or when the most recent ping will be considered lost.
*/
private keepaliveTimeout: NodeJS.Timeout | null = null;
/**
* Indicates that the keepalive timer ran out while there were no active
* calls, and a ping should be sent the next time a call starts.
*/
private pendingSendKeepalivePing = false;
/**
* Timer reference tracking when the most recent ping will be considered lost
* Indicates when keepalives should no longer be performed for this transport. Used to prevent a race where a
* latent session.ping(..) callback is called after the transport has been notified to disconnect.
*/
private keepaliveTimeoutId: NodeJS.Timeout | null = null;
/**
* Indicates whether keepalive pings should be sent without any active calls
*/
private keepaliveWithoutCalls = false;
private keepaliveDisabled = false;
private userAgent: string;
@ -182,9 +183,13 @@ class Http2Transport implements Transport {
if ('grpc.keepalive_time_ms' in options) {
this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
} else {
this.keepaliveTimeMs = -1;
}
if ('grpc.keepalive_timeout_ms' in options) {
this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!;
} else {
this.keepaliveTimeoutMs = KEEPALIVE_TIMEOUT_MS;
}
if ('grpc.keepalive_permit_without_calls' in options) {
this.keepaliveWithoutCalls =
@ -195,7 +200,6 @@ class Http2Transport implements Transport {
session.once('close', () => {
this.trace('session closed');
this.stopKeepalivePings();
this.handleDisconnect();
});
@ -383,6 +387,8 @@ class Http2Transport implements Transport {
* Handle connection drops, but not GOAWAYs.
*/
private handleDisconnect() {
this.keepaliveDisabled = true;
this.clearKeepaliveTimeout();
this.reportDisconnectToOwner(false);
/* Give calls an event loop cycle to finish naturally before reporting the
* disconnnection to them. */
@ -397,63 +403,48 @@ class Http2Transport implements Transport {
this.disconnectListeners.push(listener);
}
private clearKeepaliveTimer() {
if (!this.keepaliveTimerId) {
return;
}
clearTimeout(this.keepaliveTimerId);
this.keepaliveTimerId = null;
}
private clearKeepaliveTimeout() {
if (!this.keepaliveTimeoutId) {
return;
}
clearTimeout(this.keepaliveTimeoutId);
this.keepaliveTimeoutId = null;
}
private canSendPing() {
return (
!this.keepaliveDisabled &&
this.keepaliveTimeMs > 0 &&
(this.keepaliveWithoutCalls || this.activeCalls.size > 0)
);
}
private maybeSendPing() {
this.clearKeepaliveTimer();
if (!this.canSendPing()) {
this.pendingSendKeepalivePing = true;
return;
}
if (this.keepaliveTimeout) {
console.error('keepaliveTimeout is not null');
return;
}
if (this.channelzEnabled) {
this.keepalivesSent += 1;
}
this.keepaliveTrace(
'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'
);
if (!this.keepaliveTimeoutId) {
this.keepaliveTimeoutId = setTimeout(() => {
this.keepaliveTrace('Ping timeout passed without response');
this.handleDisconnect();
}, this.keepaliveTimeoutMs);
this.keepaliveTimeoutId.unref?.();
}
try {
this.session!.ping(
(err: Error | null, duration: number, payload: Buffer) => {
if (err) {
this.keepaliveTrace('Ping failed with error ' + err.message);
this.handleDisconnect();
}
this.keepaliveTimeout = setTimeout(() => {
this.keepaliveTrace('Ping timeout passed without response');
this.handleDisconnect();
}, this.keepaliveTimeoutMs);
this.keepaliveTimeout.unref?.();
const pingSentSuccessfully = this.session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
this.clearKeepaliveTimeout();
if (err) {
this.keepaliveTrace('Ping failed with error ' + err.message);
this.handleDisconnect();
} else {
this.keepaliveTrace('Received ping response');
this.clearKeepaliveTimeout();
this.maybeStartKeepalivePingTimer();
}
);
} catch (e) {
/* If we fail to send a ping, the connection is no longer functional, so
* we should discard it. */
}
);
if (!pingSentSuccessfully) {
this.keepaliveTrace('Ping failed to send');
this.handleDisconnect();
}
}
@ -471,25 +462,27 @@ class Http2Transport implements Transport {
if (this.pendingSendKeepalivePing) {
this.pendingSendKeepalivePing = false;
this.maybeSendPing();
} else if (!this.keepaliveTimerId && !this.keepaliveTimeoutId) {
} else if (!this.keepaliveTimeout) {
this.keepaliveTrace(
'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms'
);
this.keepaliveTimerId = setTimeout(() => {
this.keepaliveTimeout = setTimeout(() => {
this.maybeSendPing();
}, this.keepaliveTimeMs);
this.keepaliveTimerId.unref?.();
this.keepaliveTimeout.unref?.();
}
/* Otherwise, there is already either a keepalive timer or a ping pending,
* wait for those to resolve. */
}
private stopKeepalivePings() {
if (this.keepaliveTimerId) {
clearTimeout(this.keepaliveTimerId);
this.keepaliveTimerId = null;
/**
* Clears whichever keepalive timeout is currently active, if any.
*/
private clearKeepaliveTimeout() {
if (this.keepaliveTimeout) {
clearTimeout(this.keepaliveTimeout);
this.keepaliveTimeout = null;
}
this.clearKeepaliveTimeout();
}
private removeActiveCall(call: Http2SubchannelCall) {
@ -533,7 +526,7 @@ class Http2Transport implements Transport {
* error here.
*/
try {
http2Stream = this.session!.request(headers);
http2Stream = this.session.request(headers);
} catch (e) {
this.handleDisconnect();
throw e;