Merge pull request #2750 from murgatroid99/grpc-js_idle_uds_fix

grpc-js: Fix UDS channels not reconnecting after going idle
This commit is contained in:
Michael Lumish 2024-05-15 09:17:41 -07:00 committed by GitHub
commit 45e5fe5462
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 86 additions and 20 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@grpc/grpc-js", "name": "@grpc/grpc-js",
"version": "1.10.7", "version": "1.10.8",
"description": "gRPC Library for Node - pure JS implementation", "description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/", "homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -50,7 +50,7 @@ class UdsResolver implements Resolver {
} }
destroy() { destroy() {
// This resolver owns no resources, so we do nothing here. this.hasReturnedResult = false;
} }
static getDefaultAuthority(target: GrpcUri): string { static getDefaultAuthority(target: GrpcUri): string {

View File

@ -19,6 +19,8 @@ import * as loader from '@grpc/proto-loader';
import * as assert2 from './assert2'; import * as assert2 from './assert2';
import * as path from 'path'; import * as path from 'path';
import * as grpc from '../src'; import * as grpc from '../src';
import * as fsPromises from 'fs/promises';
import * as os from 'os';
import { import {
GrpcObject, GrpcObject,
@ -71,54 +73,77 @@ const serviceImpl = {
export class TestServer { export class TestServer {
private server: grpc.Server; private server: grpc.Server;
public port: number | null = null; private target: string | null = null;
constructor(public useTls: boolean, options?: grpc.ServerOptions) { constructor(public useTls: boolean, options?: grpc.ServerOptions) {
this.server = new grpc.Server(options); this.server = new grpc.Server(options);
this.server.addService(echoService.service, serviceImpl); this.server.addService(echoService.service, serviceImpl);
} }
start(): Promise<void> {
let credentials: grpc.ServerCredentials; private getCredentials(): grpc.ServerCredentials {
if (this.useTls) { if (this.useTls) {
credentials = grpc.ServerCredentials.createSsl(null, [ return grpc.ServerCredentials.createSsl(null, [
{ private_key: key, cert_chain: cert }, { private_key: key, cert_chain: cert },
]); ]);
} else { } else {
credentials = grpc.ServerCredentials.createInsecure(); return grpc.ServerCredentials.createInsecure();
} }
}
start(): Promise<void> {
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
this.server.bindAsync('localhost:0', credentials, (error, port) => { this.server.bindAsync('localhost:0', this.getCredentials(), (error, port) => {
if (error) { if (error) {
reject(error); reject(error);
return; return;
} }
this.port = port; this.target = `localhost:${port}`;
resolve(); resolve();
}); });
}); });
} }
startUds(): Promise<void> {
return fsPromises.mkdtemp(path.join(os.tmpdir(), 'uds')).then(dir => {
return new Promise<void>((resolve, reject) => {
const target = `unix://${dir}/socket`;
this.server.bindAsync(target, this.getCredentials(), (error, port) => {
if (error) {
reject(error);
return;
}
this.target = target;
resolve();
});
});
});
}
shutdown() { shutdown() {
this.server.forceShutdown(); this.server.forceShutdown();
} }
getTarget() {
if (this.target === null) {
throw new Error('Server not yet started');
}
return this.target;
}
} }
export class TestClient { export class TestClient {
private client: ServiceClient; private client: ServiceClient;
constructor(port: number, useTls: boolean, options?: grpc.ChannelOptions) { constructor(target: string, useTls: boolean, options?: grpc.ChannelOptions) {
let credentials: grpc.ChannelCredentials; let credentials: grpc.ChannelCredentials;
if (useTls) { if (useTls) {
credentials = grpc.credentials.createSsl(ca); credentials = grpc.credentials.createSsl(ca);
} else { } else {
credentials = grpc.credentials.createInsecure(); credentials = grpc.credentials.createInsecure();
} }
this.client = new echoService(`localhost:${port}`, credentials, options); this.client = new echoService(target, credentials, options);
} }
static createFromServer(server: TestServer, options?: grpc.ChannelOptions) { static createFromServer(server: TestServer, options?: grpc.ChannelOptions) {
if (server.port === null) { return new TestClient(server.getTarget(), server.useTls, options);
throw new Error('Cannot create client, server not started');
}
return new TestClient(server.port, server.useTls, options);
} }
waitForReady(deadline: grpc.Deadline, callback: (error?: Error) => void) { waitForReady(deadline: grpc.Deadline, callback: (error?: Error) => void) {

View File

@ -129,6 +129,47 @@ describe('Channel idle timer', () => {
}); });
}); });
describe('Channel idle timer with UDS', () => {
let server: TestServer;
let client: TestClient | null = null;
before(() => {
server = new TestServer(false);
return server.startUds();
});
afterEach(() => {
if (client) {
client.close();
client = null;
}
});
after(() => {
server.shutdown();
});
it('Should be able to make a request after going idle', function (done) {
this.timeout(5000);
client = TestClient.createFromServer(server, {
'grpc.client_idle_timeout_ms': 1000,
});
client.sendRequest(error => {
assert.ifError(error);
assert.strictEqual(
client!.getChannelState(),
grpc.connectivityState.READY
);
setTimeout(() => {
assert.strictEqual(
client!.getChannelState(),
grpc.connectivityState.IDLE
);
client!.sendRequest(error => {
assert.ifError(error);
done();
});
}, 1100);
});
});
});
describe('Server idle timer', () => { describe('Server idle timer', () => {
let server: TestServer; let server: TestServer;
let client: TestClient | null = null; let client: TestClient | null = null;

View File

@ -811,7 +811,7 @@ describe('pick_first load balancing policy', () => {
before(async () => { before(async () => {
server = new TestServer(false); server = new TestServer(false);
await server.start(); await server.start();
client = new TestClient(server.port!, false, { client = TestClient.createFromServer(server, {
'grpc.service_config': JSON.stringify(serviceConfig), 'grpc.service_config': JSON.stringify(serviceConfig),
}); });
}); });

View File

@ -153,7 +153,7 @@ describe('Server interceptors', () => {
grpc.ServerCredentials.createInsecure(), grpc.ServerCredentials.createInsecure(),
(error, port) => { (error, port) => {
assert.ifError(error); assert.ifError(error);
client = new TestClient(port, false); client = new TestClient(`localhost:${port}`, false);
done(); done();
} }
); );
@ -195,7 +195,7 @@ describe('Server interceptors', () => {
grpc.ServerCredentials.createInsecure(), grpc.ServerCredentials.createInsecure(),
(error, port) => { (error, port) => {
assert.ifError(error); assert.ifError(error);
client = new TestClient(port, false); client = new TestClient(`localhost:${port}`, false);
done(); done();
} }
); );
@ -246,7 +246,7 @@ describe('Server interceptors', () => {
grpc.ServerCredentials.createInsecure(), grpc.ServerCredentials.createInsecure(),
(error, port) => { (error, port) => {
assert.ifError(error); assert.ifError(error);
client = new TestClient(port, false); client = new TestClient(`localhost:${port}`, false);
done(); done();
} }
); );
@ -292,7 +292,7 @@ describe('Server interceptors', () => {
grpc.ServerCredentials.createInsecure(), grpc.ServerCredentials.createInsecure(),
(error, port) => { (error, port) => {
assert.ifError(error); assert.ifError(error);
client = new TestClient(port, false); client = new TestClient(`localhost:${port}`, false);
done(); done();
} }
); );