mirror of https://github.com/grpc/grpc-node.git
grpc-js: Use resolver to bind server ports
This commit is contained in:
parent
4786f4a005
commit
c1d6bf91bc
|
|
@ -20,7 +20,7 @@ import { AddressInfo, ListenOptions } from 'net';
|
||||||
import { URL } from 'url';
|
import { URL } from 'url';
|
||||||
|
|
||||||
import { ServiceError } from './call';
|
import { ServiceError } from './call';
|
||||||
import { Status } from './constants';
|
import { Status, LogVerbosity } from './constants';
|
||||||
import { Deserialize, Serialize, ServiceDefinition } from './make-client';
|
import { Deserialize, Serialize, ServiceDefinition } from './make-client';
|
||||||
import { Metadata } from './metadata';
|
import { Metadata } from './metadata';
|
||||||
import {
|
import {
|
||||||
|
|
@ -46,6 +46,14 @@ import {
|
||||||
} from './server-call';
|
} from './server-call';
|
||||||
import { ServerCredentials } from './server-credentials';
|
import { ServerCredentials } from './server-credentials';
|
||||||
import { ChannelOptions } from './channel-options';
|
import { ChannelOptions } from './channel-options';
|
||||||
|
import { createResolver, ResolverListener } from './resolver';
|
||||||
|
import { log } from './logging';
|
||||||
|
import { SubchannelAddress, TcpSubchannelAddress, isTcpSubchannelAddress } from './subchannel';
|
||||||
|
|
||||||
|
interface BindResult {
|
||||||
|
port: number;
|
||||||
|
count: number;
|
||||||
|
}
|
||||||
|
|
||||||
function noop(): void {}
|
function noop(): void {}
|
||||||
|
|
||||||
|
|
@ -104,10 +112,8 @@ function getDefaultHandler(handlerType: HandlerType, methodName: string) {
|
||||||
// tslint:enable:no-any
|
// tslint:enable:no-any
|
||||||
|
|
||||||
export class Server {
|
export class Server {
|
||||||
private http2Server:
|
private http2ServerList: (http2.Http2Server | http2.Http2SecureServer)[] = [];
|
||||||
| http2.Http2Server
|
|
||||||
| http2.Http2SecureServer
|
|
||||||
| null = null;
|
|
||||||
private handlers: Map<string, UntypedHandler> = new Map<
|
private handlers: Map<string, UntypedHandler> = new Map<
|
||||||
string,
|
string,
|
||||||
UntypedHandler
|
UntypedHandler
|
||||||
|
|
@ -217,8 +223,6 @@ export class Server {
|
||||||
throw new TypeError('callback must be a function');
|
throw new TypeError('callback must be a function');
|
||||||
}
|
}
|
||||||
|
|
||||||
const url = new URL(`http://${port}`);
|
|
||||||
const options: ListenOptions = { host: url.hostname, port: +url.port };
|
|
||||||
const serverOptions: http2.ServerOptions = {};
|
const serverOptions: http2.ServerOptions = {};
|
||||||
if ('grpc.max_concurrent_streams' in this.options) {
|
if ('grpc.max_concurrent_streams' in this.options) {
|
||||||
serverOptions.settings = {
|
serverOptions.settings = {
|
||||||
|
|
@ -226,40 +230,147 @@ export class Server {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
if (creds._isSecure()) {
|
const setupServer = (): http2.Http2Server | http2.Http2SecureServer => {
|
||||||
const secureServerOptions = Object.assign(
|
let http2Server: http2.Http2Server | http2.Http2SecureServer;
|
||||||
serverOptions,
|
if (creds._isSecure()) {
|
||||||
creds._getSettings()!
|
const secureServerOptions = Object.assign(
|
||||||
);
|
serverOptions,
|
||||||
this.http2Server = http2.createSecureServer(secureServerOptions);
|
creds._getSettings()!
|
||||||
} else {
|
);
|
||||||
this.http2Server = http2.createServer(serverOptions);
|
http2Server = http2.createSecureServer(secureServerOptions);
|
||||||
|
} else {
|
||||||
|
http2Server = http2.createServer(serverOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
http2Server.setTimeout(0, noop);
|
||||||
|
this._setupHandlers(http2Server);
|
||||||
|
return http2Server;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.http2Server.setTimeout(0, noop);
|
const bindSpecificPort = (addressList: SubchannelAddress[], portNum: number, previousCount: number): Promise<BindResult> => {
|
||||||
this._setupHandlers();
|
if (addressList.length === 0) {
|
||||||
|
if (previousCount > 0) {
|
||||||
|
return Promise.resolve({port: portNum, count: previousCount});
|
||||||
|
} else {
|
||||||
|
return Promise.reject<BindResult>(new Error('No addresses bound'));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Promise.all(addressList.map(address => {
|
||||||
|
let addr: SubchannelAddress;
|
||||||
|
if (isTcpSubchannelAddress(address)) {
|
||||||
|
addr = {
|
||||||
|
host: (address as TcpSubchannelAddress).host,
|
||||||
|
port: portNum
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
addr = address
|
||||||
|
}
|
||||||
|
|
||||||
|
const http2Server = setupServer();
|
||||||
|
return new Promise<number|Error>((resolve, reject) => {
|
||||||
|
function onError(err: Error): void {
|
||||||
|
resolve(err);
|
||||||
|
}
|
||||||
|
|
||||||
function onError(err: Error): void {
|
http2Server.once('error', onError);
|
||||||
callback(err, 0);
|
|
||||||
|
http2Server.listen(addr, () => {
|
||||||
|
const boundAddress = http2Server.address()!;
|
||||||
|
if (typeof boundAddress === 'string') {
|
||||||
|
resolve(portNum);
|
||||||
|
} else {
|
||||||
|
resolve(boundAddress.port);
|
||||||
|
}
|
||||||
|
http2Server.removeListener('error', onError);
|
||||||
|
});
|
||||||
|
})
|
||||||
|
})).then(results => {
|
||||||
|
let count = 0;
|
||||||
|
for (const result of results) {
|
||||||
|
if (typeof result === 'number') {
|
||||||
|
count += 1;
|
||||||
|
if (result !== portNum) {
|
||||||
|
throw new Error('Invalid state: multiple port numbers added from single address');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
port: portNum,
|
||||||
|
count: count + previousCount
|
||||||
|
};
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
this.http2Server.once('error', onError);
|
const bindWildcardPort = (addressList: SubchannelAddress[]): Promise<BindResult> => {
|
||||||
|
if (addressList.length === 0) {
|
||||||
|
return Promise.reject<BindResult>(new Error('No addresses bound'));
|
||||||
|
}
|
||||||
|
const address = addressList[0];
|
||||||
|
const http2Server = setupServer();
|
||||||
|
return new Promise<BindResult>((resolve, reject) => {
|
||||||
|
function onError(err: Error): void {
|
||||||
|
resolve(bindWildcardPort(addressList.slice(1)));
|
||||||
|
}
|
||||||
|
|
||||||
this.http2Server.listen(options, () => {
|
http2Server.once('error', onError);
|
||||||
const server = this.http2Server as
|
|
||||||
| http2.Http2Server
|
|
||||||
| http2.Http2SecureServer;
|
|
||||||
const port = (server.address() as AddressInfo).port;
|
|
||||||
|
|
||||||
server.removeListener('error', onError);
|
http2Server.listen(address, () => {
|
||||||
callback(null, port);
|
resolve(bindSpecificPort(addressList.slice(1), (http2Server.address() as AddressInfo).port, 1));
|
||||||
});
|
http2Server.removeListener('error', onError);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const resolverListener: ResolverListener = {
|
||||||
|
onSuccessfulResolution: (addressList, serviceConfig, serviceConfigError) => {
|
||||||
|
if (addressList.length === 0) {
|
||||||
|
callback(new Error(`No addresses resolved for port ${port}`), 0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let bindResultPromise: Promise<BindResult>;
|
||||||
|
if (isTcpSubchannelAddress(addressList[0])) {
|
||||||
|
if (addressList[0].port === 0) {
|
||||||
|
bindResultPromise = bindWildcardPort(addressList);
|
||||||
|
} else {
|
||||||
|
bindResultPromise = bindSpecificPort(addressList, addressList[0].port, 0);
|
||||||
|
}
|
||||||
|
} else{
|
||||||
|
// Use an arbitrary non-zero port for non-TCP addresses
|
||||||
|
bindResultPromise = bindSpecificPort(addressList, 1, 0);
|
||||||
|
}
|
||||||
|
bindResultPromise.then(bindResult => {
|
||||||
|
if (bindResult.count === 0) {
|
||||||
|
const errorString = `No address added out of total ${addressList.length} resolved`;
|
||||||
|
log(LogVerbosity.ERROR, errorString);
|
||||||
|
callback(new Error(errorString), 0);
|
||||||
|
} else {
|
||||||
|
if (bindResult.count < addressList.length) {
|
||||||
|
log(LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`);
|
||||||
|
}
|
||||||
|
callback(null, bindResult.port);
|
||||||
|
}
|
||||||
|
}, (error) => {
|
||||||
|
const errorString = `No address added out of total ${addressList.length} resolved`;
|
||||||
|
log(LogVerbosity.ERROR, errorString);
|
||||||
|
callback(new Error(errorString), 0);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
onError: (error) => {
|
||||||
|
callback(new Error(error.details), 0);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const resolver = createResolver(port, resolverListener);
|
||||||
|
resolver.updateResolution();
|
||||||
}
|
}
|
||||||
|
|
||||||
forceShutdown(): void {
|
forceShutdown(): void {
|
||||||
// Close the server if it is still running.
|
// Close the server if it is still running.
|
||||||
if (this.http2Server && this.http2Server.listening) {
|
|
||||||
this.http2Server.close();
|
for (const http2Server of this.http2ServerList) {
|
||||||
|
if (http2Server.listening) {
|
||||||
|
http2Server.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.started = false;
|
this.started = false;
|
||||||
|
|
@ -296,7 +407,7 @@ export class Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
start(): void {
|
start(): void {
|
||||||
if (this.http2Server === null || this.http2Server.listening !== true) {
|
if (this.http2ServerList.length === 0 || this.http2ServerList.every(http2Server => http2Server.listening !== true)) {
|
||||||
throw new Error('server must be bound in order to start');
|
throw new Error('server must be bound in order to start');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -321,9 +432,11 @@ export class Server {
|
||||||
// Close the server if necessary.
|
// Close the server if necessary.
|
||||||
this.started = false;
|
this.started = false;
|
||||||
|
|
||||||
if (this.http2Server && this.http2Server.listening) {
|
for (const http2Server of this.http2ServerList) {
|
||||||
pendingChecks++;
|
if (http2Server.listening) {
|
||||||
this.http2Server.close(maybeCallback);
|
pendingChecks++;
|
||||||
|
http2Server.close(maybeCallback);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If any sessions are active, close them gracefully.
|
// If any sessions are active, close them gracefully.
|
||||||
|
|
@ -331,8 +444,6 @@ export class Server {
|
||||||
this.sessions.forEach(session => {
|
this.sessions.forEach(session => {
|
||||||
session.close(maybeCallback);
|
session.close(maybeCallback);
|
||||||
});
|
});
|
||||||
|
|
||||||
// If the server is closed and there are no active sessions, just call back.
|
|
||||||
if (pendingChecks === 0) {
|
if (pendingChecks === 0) {
|
||||||
callback();
|
callback();
|
||||||
}
|
}
|
||||||
|
|
@ -342,12 +453,12 @@ export class Server {
|
||||||
throw new Error('Not yet implemented');
|
throw new Error('Not yet implemented');
|
||||||
}
|
}
|
||||||
|
|
||||||
private _setupHandlers(): void {
|
private _setupHandlers(http2Server: http2.Http2Server | http2.Http2SecureServer): void {
|
||||||
if (this.http2Server === null) {
|
if (http2Server === null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.http2Server.on(
|
http2Server.on(
|
||||||
'stream',
|
'stream',
|
||||||
(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => {
|
(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => {
|
||||||
const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
|
const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
|
||||||
|
|
@ -416,7 +527,7 @@ export class Server {
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
this.http2Server.on('session', session => {
|
http2Server.on('session', session => {
|
||||||
if (!this.started) {
|
if (!this.started) {
|
||||||
session.destroy();
|
session.destroy();
|
||||||
return;
|
return;
|
||||||
|
|
|
||||||
|
|
@ -1000,7 +1000,7 @@ Server.prototype.bindAsync = function(port, creds, callback) {
|
||||||
* incorrect use of the function, which should not be surfaced asynchronously
|
* incorrect use of the function, which should not be surfaced asynchronously
|
||||||
*/
|
*/
|
||||||
const result = this.bind(port, creds)
|
const result = this.bind(port, creds)
|
||||||
if (result == 0) {
|
if (result === 0) {
|
||||||
setImmediate(callback, new Error('Failed to bind port'), result);
|
setImmediate(callback, new Error('Failed to bind port'), result);
|
||||||
} else {
|
} else {
|
||||||
setImmediate(callback, null, result);
|
setImmediate(callback, null, result);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue