chore: tests & cleanup of unref?.()

This commit is contained in:
AVVS 2024-03-02 07:58:54 -08:00
parent b873dce908
commit 62e8ea97e6
No known key found for this signature in database
GPG Key ID: 2B890ABACCC3E369
8 changed files with 235 additions and 91 deletions

View File

@ -20,9 +20,9 @@
"@types/lodash": "^4.14.202",
"@types/mocha": "^10.0.6",
"@types/ncp": "^2.0.8",
"@types/node": ">=20.11.20",
"@types/pify": "^5.0.4",
"@types/semver": "^7.5.8",
"@types/node": ">=20.11.20",
"@typescript-eslint/eslint-plugin": "^7.1.0",
"@typescript-eslint/parser": "^7.1.0",
"@typescript-eslint/typescript-estree": "^7.1.0",

View File

@ -415,7 +415,8 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
this.connectionDelayTimeout = setTimeout(() => {
this.startNextSubchannelConnecting(subchannelIndex + 1);
}, CONNECTION_DELAY_INTERVAL_MS).unref?.();
}, CONNECTION_DELAY_INTERVAL_MS);
this.connectionDelayTimeout.unref?.();
}
private pickSubchannel(subchannel: SubchannelInterface) {

View File

@ -309,7 +309,8 @@ class DnsResolver implements Resolver {
if (this.continueResolving) {
this.startResolutionWithBackoff();
}
}, this.minTimeBetweenResolutionsMs).unref?.();
}, this.minTimeBetweenResolutionsMs);
this.nextResolutionTimer.unref?.();
this.isNextResolutionTimerRunning = true;
}

View File

@ -212,6 +212,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
methodConfig: [],
};
}
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
this.childLoadBalancer = new ChildLoadBalancerHandler(
{

View File

@ -95,6 +95,7 @@ const MAX_CONNECTION_IDLE_MS = ~(1 << 31);
const { HTTP2_HEADER_PATH } = http2.constants;
const TRACER_NAME = 'server';
const kMaxAge = Buffer.from('max_age');
type AnyHttp2Server = http2.Http2Server | http2.Http2SecureServer;
@ -369,65 +370,61 @@ export class Server {
private getChannelzSessionInfoGetter(
session: http2.ServerHttp2Session
): () => SocketInfo {
return () => {
const sessionInfo = this.sessions.get(session)!;
const sessionSocket = session.socket;
const remoteAddress = sessionSocket.remoteAddress
? stringToSubchannelAddress(
sessionSocket.remoteAddress,
sessionSocket.remotePort
)
: null;
const localAddress = sessionSocket.localAddress
? stringToSubchannelAddress(
sessionSocket.localAddress!,
sessionSocket.localPort
)
: null;
let tlsInfo: TlsInfo | null;
if (session.encrypted) {
const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
const cipherInfo: CipherNameAndProtocol & { standardName?: string } =
tlsSocket.getCipher();
const certificate = tlsSocket.getCertificate();
const peerCertificate = tlsSocket.getPeerCertificate();
tlsInfo = {
cipherSuiteStandardName: cipherInfo.standardName ?? null,
cipherSuiteOtherName: cipherInfo.standardName
? null
: cipherInfo.name,
localCertificate:
certificate && 'raw' in certificate ? certificate.raw : null,
remoteCertificate:
peerCertificate && 'raw' in peerCertificate
? peerCertificate.raw
: null,
};
} else {
tlsInfo = null;
}
const socketInfo: SocketInfo = {
remoteAddress: remoteAddress,
localAddress: localAddress,
security: tlsInfo,
remoteName: null,
streamsStarted: sessionInfo.streamTracker.callsStarted,
streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
streamsFailed: sessionInfo.streamTracker.callsFailed,
messagesSent: sessionInfo.messagesSent,
messagesReceived: sessionInfo.messagesReceived,
keepAlivesSent: sessionInfo.keepAlivesSent,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp:
sessionInfo.streamTracker.lastCallStartedTimestamp,
lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
localFlowControlWindow: session.state.localWindowSize ?? null,
remoteFlowControlWindow: session.state.remoteWindowSize ?? null,
): SocketInfo {
const sessionInfo = this.sessions.get(session)!;
const sessionSocket = session.socket;
const remoteAddress = sessionSocket.remoteAddress
? stringToSubchannelAddress(
sessionSocket.remoteAddress,
sessionSocket.remotePort
)
: null;
const localAddress = sessionSocket.localAddress
? stringToSubchannelAddress(
sessionSocket.localAddress!,
sessionSocket.localPort
)
: null;
let tlsInfo: TlsInfo | null;
if (session.encrypted) {
const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
const cipherInfo: CipherNameAndProtocol & { standardName?: string } =
tlsSocket.getCipher();
const certificate = tlsSocket.getCertificate();
const peerCertificate = tlsSocket.getPeerCertificate();
tlsInfo = {
cipherSuiteStandardName: cipherInfo.standardName ?? null,
cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
localCertificate:
certificate && 'raw' in certificate ? certificate.raw : null,
remoteCertificate:
peerCertificate && 'raw' in peerCertificate
? peerCertificate.raw
: null,
};
return socketInfo;
} else {
tlsInfo = null;
}
const socketInfo: SocketInfo = {
remoteAddress: remoteAddress,
localAddress: localAddress,
security: tlsInfo,
remoteName: null,
streamsStarted: sessionInfo.streamTracker.callsStarted,
streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
streamsFailed: sessionInfo.streamTracker.callsFailed,
messagesSent: sessionInfo.messagesSent,
messagesReceived: sessionInfo.messagesReceived,
keepAlivesSent: sessionInfo.keepAlivesSent,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp:
sessionInfo.streamTracker.lastCallStartedTimestamp,
lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
localFlowControlWindow: session.state.localWindowSize ?? null,
remoteFlowControlWindow: session.state.remoteWindowSize ?? null,
};
return socketInfo;
}
private trace(text: string): void {
@ -1004,7 +1001,7 @@ export class Server {
for (const session of allSessions) {
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
}
}, graceTimeMs).unref();
}, graceTimeMs).unref?.();
}
forceShutdown(): void {
@ -1380,6 +1377,7 @@ export class Server {
let connectionAgeTimer: NodeJS.Timeout | null = null;
let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
let keeapliveTimeTimer: NodeJS.Timeout | null = null;
let keepaliveTimeoutTimer: NodeJS.Timeout | null = null;
let sessionClosedByServer = false;
const idleTimeoutObj = this.enableIdleTimeout(session);
@ -1401,7 +1399,7 @@ export class Server {
session.goaway(
http2.constants.NGHTTP2_NO_ERROR,
~(1 << 31),
Buffer.from('max_age')
kMaxAge
);
} catch (e) {
// The goaway can't be sent because the session is already closed
@ -1415,37 +1413,47 @@ export class Server {
if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
connectionAgeGraceTimer = setTimeout(() => {
session.destroy();
}, this.maxConnectionAgeGraceMs).unref();
}, this.maxConnectionAgeGraceMs);
connectionAgeGraceTimer.unref?.();
}
}, this.maxConnectionAgeMs + jitter).unref();
}, this.maxConnectionAgeMs + jitter);
connectionAgeTimer.unref?.();
}
if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) {
keeapliveTimeTimer = setInterval(() => {
const timeoutTimer = setTimeout(() => {
keepaliveTimeoutTimer = setTimeout(() => {
sessionClosedByServer = true;
session.close();
}, this.keepaliveTimeoutMs).unref();
}, this.keepaliveTimeoutMs);
keepaliveTimeoutTimer.unref?.();
try {
session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(timeoutTimer);
if (keepaliveTimeoutTimer) {
clearTimeout(keepaliveTimeoutTimer);
}
if (err) {
sessionClosedByServer = true;
this.trace(
`Connection dropped due to error of a ping frame ${err.message} return in ${duration}`
'Connection dropped due to error of a ping frame ' +
err.message +
' return in ' +
duration
);
session.close();
}
}
);
} catch (e) {
clearTimeout(keepaliveTimeoutTimer);
// The ping can't be sent because the session is already closed
session.destroy();
}
}, this.keepaliveTimeMs).unref();
}, this.keepaliveTimeMs);
keeapliveTimeTimer.unref?.();
}
session.on('close', () => {
@ -1464,7 +1472,10 @@ export class Server {
}
if (keeapliveTimeTimer) {
clearTimeout(keeapliveTimeTimer);
clearInterval(keeapliveTimeTimer);
if (keepaliveTimeoutTimer) {
clearTimeout(keepaliveTimeoutTimer);
}
}
if (idleTimeoutObj !== null) {
@ -1483,15 +1494,13 @@ export class Server {
return (session: http2.ServerHttp2Session) => {
const channelzRef = registerChannelzSocket(
session.socket?.remoteAddress ?? 'unknown',
this.getChannelzSessionInfoGetter(session),
this.getChannelzSessionInfoGetter.bind(this, session),
this.channelzEnabled
);
const channelzSessionInfo: ChannelzSessionInfo = {
ref: channelzRef,
streamTracker: this.channelzEnabled
? new ChannelzCallTracker()
: new ChannelzCallTrackerStub(),
streamTracker: new ChannelzCallTracker(),
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
@ -1513,6 +1522,7 @@ export class Server {
let connectionAgeTimer: NodeJS.Timeout | null = null;
let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
let keeapliveTimeTimer: NodeJS.Timeout | null = null;
let keepaliveTimeoutTimer: NodeJS.Timeout | null = null;
let sessionClosedByServer = false;
const idleTimeoutObj = this.enableIdleTimeout(session);
@ -1533,7 +1543,7 @@ export class Server {
session.goaway(
http2.constants.NGHTTP2_NO_ERROR,
~(1 << 31),
Buffer.from('max_age')
kMaxAge
);
} catch (e) {
// The goaway can't be sent because the session is already closed
@ -1547,14 +1557,16 @@ export class Server {
if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
connectionAgeGraceTimer = setTimeout(() => {
session.destroy();
}, this.maxConnectionAgeGraceMs).unref();
}, this.maxConnectionAgeGraceMs);
connectionAgeGraceTimer.unref?.();
}
}, this.maxConnectionAgeMs + jitter).unref();
}, this.maxConnectionAgeMs + jitter);
connectionAgeTimer.unref?.();
}
if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) {
keeapliveTimeTimer = setInterval(() => {
const timeoutTImer = setTimeout(() => {
keepaliveTimeoutTimer = setTimeout(() => {
sessionClosedByServer = true;
this.channelzTrace.addTrace(
'CT_INFO',
@ -1562,11 +1574,15 @@ export class Server {
);
session.close();
}, this.keepaliveTimeoutMs).unref();
}, this.keepaliveTimeoutMs);
keepaliveTimeoutTimer.unref?.();
try {
session.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(timeoutTImer);
if (keepaliveTimeoutTimer) {
clearTimeout(keepaliveTimeoutTimer);
}
if (err) {
sessionClosedByServer = true;
@ -1584,10 +1600,12 @@ export class Server {
);
channelzSessionInfo.keepAlivesSent += 1;
} catch (e) {
clearTimeout(keepaliveTimeoutTimer);
// The ping can't be sent because the session is already closed
session.destroy();
}
}, this.keepaliveTimeMs).unref();
}, this.keepaliveTimeMs);
keeapliveTimeTimer.unref?.();
}
session.on('close', () => {
@ -1610,7 +1628,10 @@ export class Server {
}
if (keeapliveTimeTimer) {
clearTimeout(keeapliveTimeTimer);
clearInterval(keeapliveTimeTimer);
if (keepaliveTimeoutTimer) {
clearTimeout(keepaliveTimeoutTimer);
}
}
if (idleTimeoutObj !== null) {
@ -1640,8 +1661,9 @@ export class Server {
this.sessionIdleTimeout,
this,
session
).unref(),
),
};
idleTimeoutObj.timeout.unref?.();
this.sessionIdleTimeouts.set(session, idleTimeoutObj);
const { socket } = session;
@ -1661,13 +1683,6 @@ export class Server {
session: http2.ServerHttp2Session
) {
const { socket } = session;
ctx.trace(
'Session idle timeout checkpoint for ' +
socket?.remoteAddress +
':' +
socket?.remotePort
);
const sessionInfo = ctx.sessionIdleTimeouts.get(session);
// if it is called while we have activeStreams - timer will not be rescheduled
@ -1679,6 +1694,15 @@ export class Server {
sessionInfo.activeStreams === 0 &&
Date.now() - sessionInfo.lastIdle >= ctx.sessionIdleTimeout
) {
ctx.trace(
'Session idle timeout triggered for ' +
socket?.remoteAddress +
':' +
socket?.remotePort +
' last idle at ' +
sessionInfo.lastIdle
);
ctx.closeSession(session);
}
}
@ -1701,6 +1725,15 @@ export class Server {
if (idleTimeoutObj.activeStreams === 0) {
idleTimeoutObj.lastIdle = Date.now();
idleTimeoutObj.timeout.refresh();
this.trace(
'Session onStreamClose' +
session.socket?.remoteAddress +
':' +
session.socket?.remotePort +
' at ' +
idleTimeoutObj.lastIdle
);
}
}
}

View File

@ -477,7 +477,8 @@ class Http2Transport implements Transport {
);
this.keepaliveTimerId = setTimeout(() => {
this.maybeSendPing();
}, this.keepaliveTimeMs).unref?.();
}, this.keepaliveTimeMs);
this.keepaliveTimerId.unref?.();
}
/* Otherwise, there is already either a keepalive timer or a ping pending,
* wait for those to resolve. */

View File

@ -140,6 +140,27 @@ export class TestClient {
return this.client.getChannel().getConnectivityState(false);
}
waitForClientState(
deadline: grpc.Deadline,
state: ConnectivityState,
callback: (error?: Error) => void
) {
this.client
.getChannel()
.watchConnectivityState(this.getChannelState(), deadline, err => {
if (err) {
return callback(err);
}
const currentState = this.getChannelState();
if (currentState === state) {
callback();
} else {
return this.waitForClientState(deadline, currentState, callback);
}
});
}
close() {
this.client.close();
}

View File

@ -128,3 +128,89 @@ describe('Channel idle timer', () => {
});
});
});
describe('Server idle timer', () => {
let server: TestServer;
let client: TestClient | null = null;
before(() => {
server = new TestServer(false, {
'grpc.max_connection_idle_ms': 500, // small for testing purposes
});
return server.start();
});
afterEach(() => {
if (client) {
client.close();
client = null;
}
});
after(() => {
server.shutdown();
});
it('Should go idle after the specified time after a request ends', function (done) {
this.timeout(5000);
client = TestClient.createFromServer(server);
client.sendRequest(error => {
assert.ifError(error);
assert.strictEqual(
client!.getChannelState(),
grpc.connectivityState.READY
);
client?.waitForClientState(
Date.now() + 600,
grpc.connectivityState.IDLE,
done
);
});
});
it('Should be able to make a request after going idle', function (done) {
this.timeout(5000);
client = TestClient.createFromServer(server);
client.sendRequest(error => {
assert.ifError(error);
assert.strictEqual(
client!.getChannelState(),
grpc.connectivityState.READY
);
client!.waitForClientState(
Date.now() + 600,
grpc.connectivityState.IDLE,
err => {
if (err) return done(err);
assert.strictEqual(
client!.getChannelState(),
grpc.connectivityState.IDLE
);
client!.sendRequest(error => {
assert.ifError(error);
done();
});
}
);
});
});
it('Should go idle after the specified time after waitForReady ends', function (done) {
this.timeout(5000);
client = TestClient.createFromServer(server);
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
client.waitForReady(deadline, error => {
assert.ifError(error);
assert.strictEqual(
client!.getChannelState(),
grpc.connectivityState.READY
);
client!.waitForClientState(
Date.now() + 600,
grpc.connectivityState.IDLE,
done
);
});
});
});