chore: updated docs, cached onStreamClose per session

This commit is contained in:
AVVS 2024-02-27 16:49:20 -08:00
parent 74102fcc87
commit 11a98b5f37
No known key found for this signature in database
GPG Key ID: 2B890ABACCC3E369
3 changed files with 112 additions and 72 deletions

View File

@ -60,6 +60,9 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
- `grpc.enable_channelz`
- `grpc.dns_min_time_between_resolutions_ms`
- `grpc.enable_retries`
- `grpc.max_connection_age_ms`
- `grpc.max_connection_age_grace_ms`
- `grpc.max_connection_idle_ms`
- `grpc.per_rpc_retry_buffer_size`
- `grpc.retry_buffer_size`
- `grpc.service_config_disable_resolution`

View File

@ -54,6 +54,7 @@ export interface ChannelOptions {
'grpc.retry_buffer_size'?: number;
'grpc.max_connection_age_ms'?: number;
'grpc.max_connection_age_grace_ms'?: number;
'grpc.max_connection_idle_ms'?: number;
'grpc-node.max_session_memory'?: number;
'grpc.service_config_disable_resolution'?: number;
'grpc.client_idle_timeout_ms'?: number;

View File

@ -90,7 +90,7 @@ import { CallEventTracker } from './transport';
const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
const KEEPALIVE_TIMEOUT_MS = 20000;
const MAX_CONNECTION_IDLE_MS = 30 * 60 * 1e3; // 30 min
const MAX_CONNECTION_IDLE_MS = ~(1 << 31);
const { HTTP2_HEADER_PATH } = http2.constants;
@ -241,6 +241,12 @@ interface Http2ServerInfo {
sessions: Set<http2.ServerHttp2Session>;
}
interface SessionIdleTimeoutTracker {
activeStreams: number;
timeout: NodeJS.Timeout | null;
onClose: (session: http2.ServerHttp2Session) => void | null;
}
export interface ServerOptions extends ChannelOptions {
interceptors?: ServerInterceptor[];
}
@ -249,11 +255,8 @@ export class Server {
private boundPorts: Map<string, BoundPort> = new Map();
private http2Servers: Map<AnyHttp2Server, Http2ServerInfo> = new Map();
private sessionIdleTimeouts = new Map<
http2.Http2Session,
{
activeStreams: number;
timeout: NodeJS.Timeout | null;
}
http2.ServerHttp2Session,
SessionIdleTimeoutTracker
>();
private handlers: Map<string, UntypedHandler> = new Map<
@ -330,7 +333,7 @@ export class Server {
this.keepaliveTimeoutMs =
this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS;
this.sessionIdleTimeout =
this.options['grpc.max_connection_idle'] ?? MAX_CONNECTION_IDLE_MS;
this.options['grpc.max_connection_idle_ms'] ?? MAX_CONNECTION_IDLE_MS;
this.commonServerOptions = {
maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
@ -903,7 +906,6 @@ export class Server {
if (sessionInfo) {
this.sessionChildrenTracker.unrefChild(sessionInfo.ref);
unregisterChannelzRef(sessionInfo.ref);
this.sessions.delete(session);
}
callback?.();
};
@ -1001,7 +1003,7 @@ export class Server {
for (const session of allSessions) {
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
}
}, graceTimeMs).unref?.();
}, graceTimeMs).unref();
}
forceShutdown(): void {
@ -1376,6 +1378,7 @@ export class Server {
let connectionAgeTimer: NodeJS.Timeout | null = null;
let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
let keeapliveTimeTimer: NodeJS.Timeout | null = null;
let sessionClosedByServer = false;
const idleTimeoutObj = this.enableIdleTimeout(session);
@ -1389,7 +1392,8 @@ export class Server {
sessionClosedByServer = true;
this.trace(
`Connection dropped by max connection age: ${session.socket?.remoteAddress}`
'Connection dropped by max connection age: ' +
session.socket?.remoteAddress
);
try {
@ -1410,36 +1414,38 @@ export class Server {
if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
connectionAgeGraceTimer = setTimeout(() => {
session.destroy();
}, this.maxConnectionAgeGraceMs).unref?.();
}, this.maxConnectionAgeGraceMs).unref();
}
}, this.maxConnectionAgeMs + jitter).unref?.();
}, this.maxConnectionAgeMs + jitter).unref();
}
const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => {
const timeoutTImer = setTimeout(() => {
sessionClosedByServer = true;
session.close();
}, this.keepaliveTimeoutMs).unref?.();
if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) {
keeapliveTimeTimer = setInterval(() => {
const timeoutTimer = setTimeout(() => {
sessionClosedByServer = true;
session.close();
}, this.keepaliveTimeoutMs).unref();
try {
session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(timeoutTImer);
try {
session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(timeoutTimer);
if (err) {
sessionClosedByServer = true;
this.trace(
`Connection dropped due to error of a ping frame ${err.message} return in ${duration}`
);
session.close();
if (err) {
sessionClosedByServer = true;
this.trace(
`Connection dropped due to error of a ping frame ${err.message} return in ${duration}`
);
session.close();
}
}
}
);
} catch (e) {
// The ping can't be sent because the session is already closed
session.destroy();
}
}, this.keepaliveTimeMs).unref?.();
);
} catch (e) {
// The ping can't be sent because the session is already closed
session.destroy();
}
}, this.keepaliveTimeMs).unref();
}
session.on('close', () => {
if (!sessionClosedByServer) {
@ -1460,8 +1466,12 @@ export class Server {
clearTimeout(keeapliveTimeTimer);
}
clearTimeout(idleTimeoutObj.timeout);
this.sessionIdleTimeouts.delete(session);
if (idleTimeoutObj !== null) {
if (idleTimeoutObj.timeout !== null) {
clearTimeout(idleTimeoutObj.timeout);
}
this.sessionIdleTimeouts.delete(session);
}
this.http2Servers.get(http2Server)?.sessions.delete(session);
});
@ -1503,6 +1513,7 @@ export class Server {
let connectionAgeTimer: NodeJS.Timeout | null = null;
let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
let keeapliveTimeTimer: NodeJS.Timeout | null = null;
let sessionClosedByServer = false;
const idleTimeoutObj = this.enableIdleTimeout(session);
@ -1537,43 +1548,48 @@ export class Server {
if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
connectionAgeGraceTimer = setTimeout(() => {
session.destroy();
}, this.maxConnectionAgeGraceMs).unref?.();
}, this.maxConnectionAgeGraceMs).unref();
}
}, this.maxConnectionAgeMs + jitter).unref?.();
}, this.maxConnectionAgeMs + jitter).unref();
}
const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => {
const timeoutTImer = setTimeout(() => {
sessionClosedByServer = true;
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by keepalive timeout from ' + clientAddress
);
if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) {
keeapliveTimeTimer = setInterval(() => {
const timeoutTImer = setTimeout(() => {
sessionClosedByServer = true;
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped by keepalive timeout from ' + clientAddress
);
session.close();
}, this.keepaliveTimeoutMs).unref?.();
try {
session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(timeoutTImer);
session.close();
}, this.keepaliveTimeoutMs).unref();
try {
session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(timeoutTImer);
if (err) {
sessionClosedByServer = true;
this.channelzTrace.addTrace(
'CT_INFO',
`Connection dropped due to error of a ping frame ${err.message} return in ${duration}`
);
if (err) {
sessionClosedByServer = true;
this.channelzTrace.addTrace(
'CT_INFO',
'Connection dropped due to error of a ping frame ' +
err.message +
' return in ' +
duration
);
session.close();
session.close();
}
}
}
);
channelzSessionInfo.keepAlivesSent += 1;
} catch (e) {
// The ping can't be sent because the session is already closed
session.destroy();
}
}, this.keepaliveTimeMs).unref?.();
);
channelzSessionInfo.keepAlivesSent += 1;
} catch (e) {
// The ping can't be sent because the session is already closed
session.destroy();
}
}, this.keepaliveTimeMs).unref();
}
session.on('close', () => {
if (!sessionClosedByServer) {
@ -1598,8 +1614,12 @@ export class Server {
clearTimeout(keeapliveTimeTimer);
}
clearTimeout(idleTimeoutObj.timeout);
this.sessionIdleTimeouts.delete(session);
if (idleTimeoutObj !== null) {
if (idleTimeoutObj.timeout !== null) {
clearTimeout(idleTimeoutObj.timeout);
}
this.sessionIdleTimeouts.delete(session);
}
this.http2Servers.get(http2Server)?.sessions.delete(session);
this.sessions.delete(session);
@ -1607,9 +1627,16 @@ export class Server {
};
}
private enableIdleTimeout(session: http2.ServerHttp2Session) {
private enableIdleTimeout(
session: http2.ServerHttp2Session
): SessionIdleTimeoutTracker | null {
if (this.sessionIdleTimeout >= MAX_CONNECTION_IDLE_MS) {
return null;
}
const idleTimeoutObj = {
activeStreams: 0,
onClose: this.onStreamClose.bind(this, session), // so that we don't recreate it each time
timeout: setTimeout(
this.onIdleTimeout,
this.sessionIdleTimeout,
@ -1619,13 +1646,22 @@ export class Server {
};
this.sessionIdleTimeouts.set(session, idleTimeoutObj);
this.trace(`Enable idle timeout for ${session.socket?.remoteAddress}`);
const { socket } = session;
this.trace(
'Enable idle timeout for ' +
socket.remoteAddress +
':' +
socket.remotePort
);
return idleTimeoutObj;
}
private onIdleTimeout(ctx: Server, session: http2.ServerHttp2Session) {
ctx.trace(`Idle timeout for ${session.socket?.remoteAddress}`);
const { socket } = session;
ctx.trace(
'Idle timeout for ' + socket?.remoteAddress + ':' + socket?.remotePort
);
ctx.closeSession(session);
}
@ -1640,7 +1676,7 @@ export class Server {
idleTimeoutObj.timeout = null;
}
stream.once('close', () => this.onStreamClose(session));
stream.once('close', idleTimeoutObj.onClose);
}
}