From 4786f4a0058029c8e71f4e9cc22c57e2b9479bc5 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 2 Mar 2020 17:27:46 -0800 Subject: [PATCH 1/4] grpc_server_add_(in)secure_port returns 0 on error. Reflect that in bind(Async) --- packages/grpc-js/src/server.ts | 2 +- packages/grpc-native-core/src/server.js | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index a3274f14..d90a2c97 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -240,7 +240,7 @@ export class Server { this._setupHandlers(); function onError(err: Error): void { - callback(err, -1); + callback(err, 0); } this.http2Server.once('error', onError); diff --git a/packages/grpc-native-core/src/server.js b/packages/grpc-native-core/src/server.js index 81e56347..e7fd2094 100644 --- a/packages/grpc-native-core/src/server.js +++ b/packages/grpc-native-core/src/server.js @@ -970,7 +970,7 @@ Server.prototype.addProtoService = util.deprecate(function(service, * "address:port" * @param {grpc.ServerCredentials} creds Server credential object to be used for * SSL. Pass an insecure credentials object for an insecure port. - * @return {number} The bound port number. Negative if binding the port failed. + * @return {number} The bound port number. Zero if binding the port failed. */ Server.prototype.bind = function(port, creds) { if (this.started) { @@ -984,7 +984,7 @@ Server.prototype.bind = function(port, creds) { * @callback grpc.Server~bindCallback * @param {Error=} error If non-null, indicates that binding the port failed. * @param {number} port The bound port number. If binding the port fails, this - * will be negative to match the output of bind. + * will be zero to match the output of bind. */ /** @@ -1000,7 +1000,7 @@ Server.prototype.bindAsync = function(port, creds, callback) { * incorrect use of the function, which should not be surfaced asynchronously */ const result = this.bind(port, creds) - if (result < 0) { + if (result == 0) { setImmediate(callback, new Error('Failed to bind port'), result); } else { setImmediate(callback, null, result); From c1d6bf91bce11208c3269140e34b9c2fa00d4467 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 4 Mar 2020 17:17:34 -0800 Subject: [PATCH 2/4] grpc-js: Use resolver to bind server ports --- packages/grpc-js/src/server.ts | 191 +++++++++++++++++++----- packages/grpc-native-core/src/server.js | 2 +- 2 files changed, 152 insertions(+), 41 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index d90a2c97..8e6af3d3 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -20,7 +20,7 @@ import { AddressInfo, ListenOptions } from 'net'; import { URL } from 'url'; import { ServiceError } from './call'; -import { Status } from './constants'; +import { Status, LogVerbosity } from './constants'; import { Deserialize, Serialize, ServiceDefinition } from './make-client'; import { Metadata } from './metadata'; import { @@ -46,6 +46,14 @@ import { } from './server-call'; import { ServerCredentials } from './server-credentials'; import { ChannelOptions } from './channel-options'; +import { createResolver, ResolverListener } from './resolver'; +import { log } from './logging'; +import { SubchannelAddress, TcpSubchannelAddress, isTcpSubchannelAddress } from './subchannel'; + +interface BindResult { + port: number; + count: number; +} function noop(): void {} @@ -104,10 +112,8 @@ function getDefaultHandler(handlerType: HandlerType, methodName: string) { // tslint:enable:no-any export class Server { - private http2Server: - | http2.Http2Server - | http2.Http2SecureServer - | null = null; + private http2ServerList: (http2.Http2Server | http2.Http2SecureServer)[] = []; + private handlers: Map = new Map< string, UntypedHandler @@ -217,8 +223,6 @@ export class Server { throw new TypeError('callback must be a function'); } - const url = new URL(`http://${port}`); - const options: ListenOptions = { host: url.hostname, port: +url.port }; const serverOptions: http2.ServerOptions = {}; if ('grpc.max_concurrent_streams' in this.options) { serverOptions.settings = { @@ -226,40 +230,147 @@ export class Server { }; } - if (creds._isSecure()) { - const secureServerOptions = Object.assign( - serverOptions, - creds._getSettings()! - ); - this.http2Server = http2.createSecureServer(secureServerOptions); - } else { - this.http2Server = http2.createServer(serverOptions); + const setupServer = (): http2.Http2Server | http2.Http2SecureServer => { + let http2Server: http2.Http2Server | http2.Http2SecureServer; + if (creds._isSecure()) { + const secureServerOptions = Object.assign( + serverOptions, + creds._getSettings()! + ); + http2Server = http2.createSecureServer(secureServerOptions); + } else { + http2Server = http2.createServer(serverOptions); + } + + http2Server.setTimeout(0, noop); + this._setupHandlers(http2Server); + return http2Server; } - this.http2Server.setTimeout(0, noop); - this._setupHandlers(); + const bindSpecificPort = (addressList: SubchannelAddress[], portNum: number, previousCount: number): Promise => { + if (addressList.length === 0) { + if (previousCount > 0) { + return Promise.resolve({port: portNum, count: previousCount}); + } else { + return Promise.reject(new Error('No addresses bound')); + } + } + return Promise.all(addressList.map(address => { + let addr: SubchannelAddress; + if (isTcpSubchannelAddress(address)) { + addr = { + host: (address as TcpSubchannelAddress).host, + port: portNum + }; + } else { + addr = address + } + + const http2Server = setupServer(); + return new Promise((resolve, reject) => { + function onError(err: Error): void { + resolve(err); + } - function onError(err: Error): void { - callback(err, 0); + http2Server.once('error', onError); + + http2Server.listen(addr, () => { + const boundAddress = http2Server.address()!; + if (typeof boundAddress === 'string') { + resolve(portNum); + } else { + resolve(boundAddress.port); + } + http2Server.removeListener('error', onError); + }); + }) + })).then(results => { + let count = 0; + for (const result of results) { + if (typeof result === 'number') { + count += 1; + if (result !== portNum) { + throw new Error('Invalid state: multiple port numbers added from single address'); + } + } + } + return { + port: portNum, + count: count + previousCount + }; + }); } - this.http2Server.once('error', onError); + const bindWildcardPort = (addressList: SubchannelAddress[]): Promise => { + if (addressList.length === 0) { + return Promise.reject(new Error('No addresses bound')); + } + const address = addressList[0]; + const http2Server = setupServer(); + return new Promise((resolve, reject) => { + function onError(err: Error): void { + resolve(bindWildcardPort(addressList.slice(1))); + } - this.http2Server.listen(options, () => { - const server = this.http2Server as - | http2.Http2Server - | http2.Http2SecureServer; - const port = (server.address() as AddressInfo).port; + http2Server.once('error', onError); - server.removeListener('error', onError); - callback(null, port); - }); + http2Server.listen(address, () => { + resolve(bindSpecificPort(addressList.slice(1), (http2Server.address() as AddressInfo).port, 1)); + http2Server.removeListener('error', onError); + }); + }); + } + + const resolverListener: ResolverListener = { + onSuccessfulResolution: (addressList, serviceConfig, serviceConfigError) => { + if (addressList.length === 0) { + callback(new Error(`No addresses resolved for port ${port}`), 0); + return; + } + let bindResultPromise: Promise; + if (isTcpSubchannelAddress(addressList[0])) { + if (addressList[0].port === 0) { + bindResultPromise = bindWildcardPort(addressList); + } else { + bindResultPromise = bindSpecificPort(addressList, addressList[0].port, 0); + } + } else{ + // Use an arbitrary non-zero port for non-TCP addresses + bindResultPromise = bindSpecificPort(addressList, 1, 0); + } + bindResultPromise.then(bindResult => { + if (bindResult.count === 0) { + const errorString = `No address added out of total ${addressList.length} resolved`; + log(LogVerbosity.ERROR, errorString); + callback(new Error(errorString), 0); + } else { + if (bindResult.count < addressList.length) { + log(LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`); + } + callback(null, bindResult.port); + } + }, (error) => { + const errorString = `No address added out of total ${addressList.length} resolved`; + log(LogVerbosity.ERROR, errorString); + callback(new Error(errorString), 0); + }); + }, + onError: (error) => { + callback(new Error(error.details), 0); + } + }; + + const resolver = createResolver(port, resolverListener); + resolver.updateResolution(); } forceShutdown(): void { // Close the server if it is still running. - if (this.http2Server && this.http2Server.listening) { - this.http2Server.close(); + + for (const http2Server of this.http2ServerList) { + if (http2Server.listening) { + http2Server.close(); + } } this.started = false; @@ -296,7 +407,7 @@ export class Server { } start(): void { - if (this.http2Server === null || this.http2Server.listening !== true) { + if (this.http2ServerList.length === 0 || this.http2ServerList.every(http2Server => http2Server.listening !== true)) { throw new Error('server must be bound in order to start'); } @@ -321,9 +432,11 @@ export class Server { // Close the server if necessary. this.started = false; - if (this.http2Server && this.http2Server.listening) { - pendingChecks++; - this.http2Server.close(maybeCallback); + for (const http2Server of this.http2ServerList) { + if (http2Server.listening) { + pendingChecks++; + http2Server.close(maybeCallback); + } } // If any sessions are active, close them gracefully. @@ -331,8 +444,6 @@ export class Server { this.sessions.forEach(session => { session.close(maybeCallback); }); - - // If the server is closed and there are no active sessions, just call back. if (pendingChecks === 0) { callback(); } @@ -342,12 +453,12 @@ export class Server { throw new Error('Not yet implemented'); } - private _setupHandlers(): void { - if (this.http2Server === null) { + private _setupHandlers(http2Server: http2.Http2Server | http2.Http2SecureServer): void { + if (http2Server === null) { return; } - this.http2Server.on( + http2Server.on( 'stream', (stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => { const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; @@ -416,7 +527,7 @@ export class Server { } ); - this.http2Server.on('session', session => { + http2Server.on('session', session => { if (!this.started) { session.destroy(); return; diff --git a/packages/grpc-native-core/src/server.js b/packages/grpc-native-core/src/server.js index e7fd2094..d72de018 100644 --- a/packages/grpc-native-core/src/server.js +++ b/packages/grpc-native-core/src/server.js @@ -1000,7 +1000,7 @@ Server.prototype.bindAsync = function(port, creds, callback) { * incorrect use of the function, which should not be surfaced asynchronously */ const result = this.bind(port, creds) - if (result == 0) { + if (result === 0) { setImmediate(callback, new Error('Failed to bind port'), result); } else { setImmediate(callback, null, result); From 3cbb46b1f78bef6cd75c715769eaac9baf4cbd70 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 5 Mar 2020 12:32:08 -0800 Subject: [PATCH 3/4] Don't explicitly reject any promises --- packages/grpc-js/src/server.ts | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 8e6af3d3..fd2af1fc 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -249,11 +249,7 @@ export class Server { const bindSpecificPort = (addressList: SubchannelAddress[], portNum: number, previousCount: number): Promise => { if (addressList.length === 0) { - if (previousCount > 0) { - return Promise.resolve({port: portNum, count: previousCount}); - } else { - return Promise.reject(new Error('No addresses bound')); - } + return Promise.resolve({port: portNum, count: previousCount}); } return Promise.all(addressList.map(address => { let addr: SubchannelAddress; @@ -303,7 +299,7 @@ export class Server { const bindWildcardPort = (addressList: SubchannelAddress[]): Promise => { if (addressList.length === 0) { - return Promise.reject(new Error('No addresses bound')); + return Promise.resolve({port: 0, count: 0}); } const address = addressList[0]; const http2Server = setupServer(); From 96d4d6acbaa5288f007941d4e360b3af49f5a55c Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 5 Mar 2020 13:01:43 -0800 Subject: [PATCH 4/4] Actually add listening http2 servers to server list --- packages/grpc-js/src/server.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index fd2af1fc..62ed68ab 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -271,6 +271,7 @@ export class Server { http2Server.once('error', onError); http2Server.listen(addr, () => { + this.http2ServerList.push(http2Server); const boundAddress = http2Server.address()!; if (typeof boundAddress === 'string') { resolve(portNum); @@ -311,6 +312,7 @@ export class Server { http2Server.once('error', onError); http2Server.listen(address, () => { + this.http2ServerList.push(http2Server); resolve(bindSpecificPort(addressList.slice(1), (http2Server.address() as AddressInfo).port, 1)); http2Server.removeListener('error', onError); });