Merge branch 'master' into grpc-js_max_message_size

This commit is contained in:
Michael Lumish 2020-04-13 14:13:03 -07:00 committed by GitHub
commit 70b2a954e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 527 additions and 381 deletions

View File

@ -0,0 +1,10 @@
{
"root": true,
"extends": "./node_modules/gts",
"rules": {
"node/no-unpublished-import": ["error", {
"tryExtensions": [".ts", ".js", ".json", ".node"]
}],
"@typescript-eslint/no-unused-vars": "off"
}
}

View File

@ -26,7 +26,7 @@
"@types/semver": "^6.0.1",
"clang-format": "^1.0.55",
"execa": "^2.0.3",
"gts": "^1.1.0",
"gts": "^2.0.0",
"gulp": "^4.0.2",
"gulp-mocha": "^6.0.0",
"lodash": "^4.17.4",
@ -46,11 +46,11 @@
"clean": "gts clean",
"compile": "tsc -p .",
"format": "clang-format -i -style=\"{Language: JavaScript, BasedOnStyle: Google, ColumnLimit: 80}\" src/*.ts test/*.ts",
"lint": "tslint -c node_modules/google-ts-style/tslint.json -p . -t codeFrame --type-check",
"lint": "npm run check",
"prepare": "npm run compile",
"test": "gulp test",
"check": "gts check",
"fix": "gts fix",
"check": "gts check src/**/*.ts",
"fix": "gts fix src/**/*.ts",
"pretest": "npm run compile",
"posttest": "npm run check"
},

View File

@ -15,7 +15,6 @@
*
*/
import { CallCredentials } from './call-credentials';
import { Call } from './call-stream';
import { Channel } from './channel';
import { BaseFilter, Filter, FilterFactory } from './filter';

View File

@ -16,7 +16,6 @@
*/
import { Metadata } from './metadata';
import { Call } from '.';
export interface CallMetadataOptions {
service_url: string;
@ -79,7 +78,7 @@ class ComposedCallCredentials extends CallCredentials {
async generateMetadata(options: CallMetadataOptions): Promise<Metadata> {
const base: Metadata = new Metadata();
const generated: Metadata[] = await Promise.all(
this.creds.map(cred => cred.generateMetadata(options))
this.creds.map((cred) => cred.generateMetadata(options))
);
for (const gen of generated) {
base.merge(gen);

View File

@ -69,7 +69,7 @@ export interface MetadataListener {
}
export interface MessageListener {
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(message: any, next: (message: any) => void): void;
}
@ -90,7 +90,7 @@ export type Listener = Partial<FullListener>;
*/
export interface InterceptingListener {
onReceiveMetadata(metadata: Metadata): void;
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onReceiveMessage(message: any): void;
onReceiveStatus(status: StatusObject): void;
}
@ -113,16 +113,16 @@ export class InterceptingListenerImpl implements InterceptingListener {
) {}
onReceiveMetadata(metadata: Metadata): void {
this.listener.onReceiveMetadata(metadata, metadata => {
this.listener.onReceiveMetadata(metadata, (metadata) => {
this.nextListener.onReceiveMetadata(metadata);
});
}
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onReceiveMessage(message: any): void {
/* If this listener processes messages asynchronously, the last message may
* be reordered with respect to the status */
this.processingMessage = true;
this.listener.onReceiveMessage(message, msg => {
this.listener.onReceiveMessage(message, (msg) => {
this.processingMessage = false;
this.nextListener.onReceiveMessage(msg);
if (this.pendingStatus) {
@ -131,7 +131,7 @@ export class InterceptingListenerImpl implements InterceptingListener {
});
}
onReceiveStatus(status: StatusObject): void {
this.listener.onReceiveStatus(status, processedStatus => {
this.listener.onReceiveStatus(status, (processedStatus) => {
if (this.processingMessage) {
this.pendingStatus = processedStatus;
} else {
@ -224,7 +224,9 @@ export class Http2CallStream implements Call {
/* Precondition: this.finalStatus !== null */
if (!this.statusOutput) {
this.statusOutput = true;
const filteredStatus = this.filterStack.receiveTrailers(this.finalStatus!);
const filteredStatus = this.filterStack.receiveTrailers(
this.finalStatus!
);
this.listener!.onReceiveStatus(filteredStatus);
if (this.subchannel) {
this.subchannel.callUnref();
@ -355,7 +357,7 @@ export class Http2CallStream implements Call {
private handleTrailers(headers: http2.IncomingHttpHeaders) {
let headersString = '';
for (const header of Object.keys(headers)) {
headersString += '\t\t' + header + ': ' + headers[header] + '\n'
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
}
this.trace('Received server trailers:\n' + headersString);
let metadata: Metadata;
@ -366,7 +368,10 @@ export class Http2CallStream implements Call {
}
const metadataMap = metadata.getMap();
let code: Status = this.mappedStatusCode;
if (code === Status.UNKNOWN && typeof metadataMap['grpc-status'] === 'string') {
if (
code === Status.UNKNOWN &&
typeof metadataMap['grpc-status'] === 'string'
) {
const receivedStatus = Number(metadataMap['grpc-status']);
if (receivedStatus in Status) {
code = receivedStatus;
@ -378,7 +383,9 @@ export class Http2CallStream implements Call {
if (typeof metadataMap['grpc-message'] === 'string') {
details = decodeURI(metadataMap['grpc-message']);
metadata.remove('grpc-message');
this.trace('received status details string "' + details + '" from server');
this.trace(
'received status details string "' + details + '" from server'
);
}
const status: StatusObject = { code, details, metadata };
let finalStatus;
@ -415,7 +422,7 @@ export class Http2CallStream implements Call {
stream.on('response', (headers, flags) => {
let headersString = '';
for (const header of Object.keys(headers)) {
headersString += '\t\t' + header + ': ' + headers[header] + '\n'
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
}
this.trace('Received server headers:\n' + headersString);
switch (headers[':status']) {
@ -578,7 +585,9 @@ export class Http2CallStream implements Call {
}
cancelWithStatus(status: Status, details: string): void {
this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
this.trace(
'cancelWithStatus code: ' + status + ' details: "' + details + '"'
);
this.destroyHttp2Stream();
this.endCall({ code: status, details, metadata: new Metadata() });
}
@ -653,7 +662,7 @@ export class Http2CallStream implements Call {
};
const cb: WriteCallback = context.callback ?? (() => {});
this.isWriteFilterPending = true;
this.filterStack.sendMessage(Promise.resolve(writeObj)).then(message => {
this.filterStack.sendMessage(Promise.resolve(writeObj)).then((message) => {
this.isWriteFilterPending = false;
if (this.http2Stream === null) {
this.trace(

View File

@ -100,9 +100,7 @@ export class ClientUnaryCallImpl extends EventEmitter
export class ClientReadableStreamImpl<ResponseType> extends Readable
implements ClientReadableStream<ResponseType> {
public call?: InterceptingCallInterface;
constructor(
readonly deserialize: (chunk: Buffer) => ResponseType
) {
constructor(readonly deserialize: (chunk: Buffer) => ResponseType) {
super({ objectMode: true });
}
@ -122,9 +120,7 @@ export class ClientReadableStreamImpl<ResponseType> extends Readable
export class ClientWritableStreamImpl<RequestType> extends Writable
implements ClientWritableStream<RequestType> {
public call?: InterceptingCallInterface;
constructor(
readonly serialize: (value: RequestType) => Buffer
) {
constructor(readonly serialize: (value: RequestType) => Buffer) {
super({ objectMode: true });
}
@ -140,7 +136,7 @@ export class ClientWritableStreamImpl<RequestType> extends Writable
const context: MessageContext = {
callback: cb,
};
const flags: number = Number(encoding);
const flags = Number(encoding);
if (!Number.isNaN(flags)) {
context.flags = flags;
}
@ -179,7 +175,7 @@ export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
const context: MessageContext = {
callback: cb,
};
const flags: number = Number(encoding);
const flags = Number(encoding);
if (!Number.isNaN(flags)) {
context.flags = flags;
}

View File

@ -20,7 +20,7 @@ import { ConnectionOptions, createSecureContext, PeerCertificate } from 'tls';
import { CallCredentials } from './call-credentials';
import { CIPHER_SUITES, getDefaultRootsData } from './tls-helpers';
// tslint:disable-next-line:no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function verifyIsBufferOrNull(obj: any, friendlyName: string): void {
if (obj && !(obj instanceof Buffer)) {
throw new TypeError(`${friendlyName}, if provided, must be a Buffer.`);

View File

@ -34,7 +34,6 @@ import { CallCredentialsFilterFactory } from './call-credentials-filter';
import { DeadlineFilterFactory } from './deadline-filter';
import { CompressionFilterFactory } from './compression-filter';
import { getDefaultAuthority } from './resolver';
import { LoadBalancingConfig } from './load-balancing-config';
import { ServiceConfig, validateServiceConfig } from './service-config';
import { trace, log } from './logging';
import { SubchannelAddress } from './subchannel';
@ -113,7 +112,7 @@ export interface Channel {
method: string,
deadline: Deadline,
host: string | null | undefined,
parentCall: any,
parentCall: any, // eslint-disable-line @typescript-eslint/no-explicit-any
propagateFlags: number | null | undefined
): Call;
}
@ -145,11 +144,23 @@ export class ChannelImplementation implements Channel {
throw new TypeError('Channel target must be a string');
}
if (!(credentials instanceof ChannelCredentials)) {
throw new TypeError('Channel credentials must be a ChannelCredentials object');
throw new TypeError(
'Channel credentials must be a ChannelCredentials object'
);
}
if (options) {
if ((typeof options !== 'object') || !Object.values(options).every(value => typeof value === 'string' || typeof value === 'number' || typeof value === 'undefined')) {
throw new TypeError('Channel options must be an object with string or number values');
if (
typeof options !== 'object' ||
!Object.values(options).every(
(value) =>
typeof value === 'string' ||
typeof value === 'number' ||
typeof value === 'undefined'
)
) {
throw new TypeError(
'Channel options must be an object with string or number values'
);
}
}
/* The global boolean parameter to getSubchannelPool has the inverse meaning to what
@ -267,7 +278,7 @@ export class ChannelImplementation implements Channel {
callStream.filterStack
.sendMetadata(Promise.resolve(callMetadata.clone()))
.then(
finalMetadata => {
(finalMetadata) => {
const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState();
if (subchannelState === ConnectivityState.READY) {
try {
@ -276,28 +287,31 @@ export class ChannelImplementation implements Channel {
callStream
);
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ERR_HTTP2_GOAWAY_SESSION') {
if (
(error as NodeJS.ErrnoException).code ===
'ERR_HTTP2_GOAWAY_SESSION'
) {
/* An error here indicates that something went wrong with
* the picked subchannel's http2 stream right before we
* tried to start the stream. We are handling a promise
* result here, so this is asynchronous with respect to the
* original tryPick call, so calling it again is not
* recursive. We call tryPick immediately instead of
* queueing this pick again because handling the queue is
* triggered by state changes, and we want to immediately
* check if the state has already changed since the
* previous tryPick call. We do this instead of cancelling
* the stream because the correct behavior may be
* re-queueing instead, based on the logic in the rest of
* tryPick */
* the picked subchannel's http2 stream right before we
* tried to start the stream. We are handling a promise
* result here, so this is asynchronous with respect to the
* original tryPick call, so calling it again is not
* recursive. We call tryPick immediately instead of
* queueing this pick again because handling the queue is
* triggered by state changes, and we want to immediately
* check if the state has already changed since the
* previous tryPick call. We do this instead of cancelling
* the stream because the correct behavior may be
* re-queueing instead, based on the logic in the rest of
* tryPick */
trace(
LogVerbosity.INFO,
'channel',
'Failed to start call on picked subchannel ' +
pickResult.subchannel!.getAddress() +
' with error ' +
(error as Error).message +
'. Retrying pick'
pickResult.subchannel!.getAddress() +
' with error ' +
(error as Error).message +
'. Retrying pick'
);
this.tryPick(callStream, callMetadata);
} else {
@ -305,12 +319,15 @@ export class ChannelImplementation implements Channel {
LogVerbosity.INFO,
'channel',
'Failed to start call on picked subchanel ' +
pickResult.subchannel!.getAddress() +
' with error ' +
(error as Error).message +
'. Ending call'
pickResult.subchannel!.getAddress() +
' with error ' +
(error as Error).message +
'. Ending call'
);
callStream.cancelWithStatus(
Status.INTERNAL,
'Failed to start HTTP/2 stream'
);
callStream.cancelWithStatus(Status.INTERNAL, 'Failed to start HTTP/2 stream');
}
}
} else {
@ -362,7 +379,7 @@ export class ChannelImplementation implements Channel {
watcherObject: ConnectivityStateWatcher
) {
const watcherIndex = this.connectivityStateWatchers.findIndex(
value => value === watcherObject
(value) => value === watcherObject
);
if (watcherIndex >= 0) {
this.connectivityStateWatchers.splice(watcherIndex, 1);
@ -445,14 +462,16 @@ export class ChannelImplementation implements Channel {
method: string,
deadline: Deadline,
host: string | null | undefined,
parentCall: any,
parentCall: any, // eslint-disable-line @typescript-eslint/no-explicit-any
propagateFlags: number | null | undefined
): Call {
if (typeof method !== 'string') {
throw new TypeError('Channel#createCall: method must be a string');
}
if (!(typeof deadline === 'number' || deadline instanceof Date)) {
throw new TypeError('Channel#createCall: deadline must be a number or Date');
throw new TypeError(
'Channel#createCall: deadline must be a number or Date'
);
}
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
throw new Error('Channel has been shut down');

View File

@ -18,27 +18,22 @@
import { Metadata } from './metadata';
import {
StatusObject,
CallStreamOptions,
Listener,
MetadataListener,
MessageListener,
StatusListener,
FullListener,
InterceptingListener,
WriteObject,
WriteCallback,
InterceptingListenerImpl,
isInterceptingListener,
MessageContext,
Http2CallStream,
Deadline,
Call,
} from './call-stream';
import { Status } from './constants';
import { Channel } from './channel';
import { CallOptions } from './client';
import { CallCredentials } from './call-credentials';
import { ClientMethodDefinition, Serialize } from './make-client';
import { ClientMethodDefinition } from './make-client';
/**
* Error class associated with passing both interceptors and interceptor
@ -64,7 +59,7 @@ export interface MetadataRequester {
}
export interface MessageRequester {
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(message: any, next: (message: any) => void): void;
}
@ -180,16 +175,16 @@ const defaultRequester: FullRequester = {
sendMessage: (message, next) => {
next(message);
},
halfClose: next => {
halfClose: (next) => {
next();
},
cancel: next => {
cancel: (next) => {
next();
},
};
export interface InterceptorOptions extends CallOptions {
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
method_definition: ClientMethodDefinition<any, any>;
}
@ -197,9 +192,9 @@ export interface InterceptingCallInterface {
cancelWithStatus(status: Status, details: string): void;
getPeer(): string;
start(metadata: Metadata, listener?: Partial<InterceptingListener>): void;
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sendMessageWithContext(context: MessageContext, message: any): void;
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sendMessage(message: any): void;
startRead(): void;
halfClose(): void;
@ -254,13 +249,13 @@ export class InterceptingCall implements InterceptingCallInterface {
const fullInterceptingListener: InterceptingListener = {
onReceiveMetadata:
interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ??
(metadata => {}),
((metadata) => {}),
onReceiveMessage:
interceptingListener?.onReceiveMessage?.bind(interceptingListener) ??
(message => {}),
((message) => {}),
onReceiveStatus:
interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
(status => {}),
((status) => {}),
};
this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
let finalInterceptingListener: InterceptingListener;
@ -283,10 +278,10 @@ export class InterceptingCall implements InterceptingCallInterface {
this.nextCall.start(md, finalInterceptingListener);
});
}
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sendMessageWithContext(context: MessageContext, message: any): void {
this.processingMessage = true;
this.requester.sendMessage(message, finalMessage => {
this.requester.sendMessage(message, (finalMessage) => {
this.processingMessage = false;
this.nextCall.sendMessageWithContext(context, finalMessage);
if (this.pendingHalfClose) {
@ -294,7 +289,7 @@ export class InterceptingCall implements InterceptingCallInterface {
}
});
}
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sendMessage(message: any): void {
this.sendMessageWithContext({}, message);
}
@ -343,9 +338,9 @@ function getCall(channel: Channel, path: string, options: CallOptions): Call {
* object and handles serialization and deseraizliation.
*/
class BaseInterceptingCall implements InterceptingCallInterface {
// tslint:disable-next-line no-any
constructor(
protected call: Call,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
protected methodDefinition: ClientMethodDefinition<any, any>
) {}
cancelWithStatus(status: Status, details: string): void {
@ -357,7 +352,7 @@ class BaseInterceptingCall implements InterceptingCallInterface {
setCredentials(credentials: CallCredentials): void {
this.call.setCredentials(credentials);
}
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sendMessageWithContext(context: MessageContext, message: any): void {
let serialized: Buffer;
try {
@ -367,7 +362,7 @@ class BaseInterceptingCall implements InterceptingCallInterface {
this.call.cancelWithStatus(Status.INTERNAL, 'Serialization failure');
}
}
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sendMessage(message: any) {
this.sendMessageWithContext({}, message);
}
@ -377,11 +372,11 @@ class BaseInterceptingCall implements InterceptingCallInterface {
): void {
let readError: StatusObject | null = null;
this.call.start(metadata, {
onReceiveMetadata: metadata => {
onReceiveMetadata: (metadata) => {
interceptingListener?.onReceiveMetadata?.(metadata);
},
onReceiveMessage: message => {
// tslint:disable-next-line no-any
onReceiveMessage: (message) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let deserialized: any;
try {
deserialized = this.methodDefinition.responseDeserialize(message);
@ -395,7 +390,7 @@ class BaseInterceptingCall implements InterceptingCallInterface {
this.call.cancelWithStatus(readError.code, readError.details);
}
},
onReceiveStatus: status => {
onReceiveStatus: (status) => {
if (readError) {
interceptingListener?.onReceiveStatus?.(readError);
} else {
@ -418,7 +413,7 @@ class BaseInterceptingCall implements InterceptingCallInterface {
*/
class BaseUnaryInterceptingCall extends BaseInterceptingCall
implements InterceptingCallInterface {
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
constructor(call: Call, methodDefinition: ClientMethodDefinition<any, any>) {
super(call, methodDefinition);
}
@ -426,8 +421,8 @@ class BaseUnaryInterceptingCall extends BaseInterceptingCall
let receivedMessage = false;
const wrapperListener: InterceptingListener = {
onReceiveMetadata:
listener?.onReceiveMetadata?.bind(listener) ?? (metadata => {}),
// tslint:disable-next-line no-any
listener?.onReceiveMetadata?.bind(listener) ?? ((metadata) => {}),
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onReceiveMessage: (message: any) => {
receivedMessage = true;
listener?.onReceiveMessage?.(message);
@ -451,10 +446,10 @@ class BaseUnaryInterceptingCall extends BaseInterceptingCall
class BaseStreamingInterceptingCall extends BaseInterceptingCall
implements InterceptingCallInterface {}
// tslint:disable-next-line no-any
function getBottomInterceptingCall(
channel: Channel,
options: InterceptorOptions,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
methodDefinition: ClientMethodDefinition<any, any>
) {
const call = getCall(channel, methodDefinition.path, options);
@ -474,7 +469,7 @@ export interface Interceptor {
}
export interface InterceptorProvider {
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(methodDefinition: ClientMethodDefinition<any, any>): Interceptor;
}
@ -485,9 +480,9 @@ export interface InterceptorArguments {
callInterceptorProviders: InterceptorProvider[];
}
// tslint:disable-next-line no-any
export function getInterceptingCall(
interceptorArgs: InterceptorArguments,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
methodDefinition: ClientMethodDefinition<any, any>,
options: CallOptions,
channel: Channel
@ -519,21 +514,21 @@ export function getInterceptingCall(
interceptors = ([] as Interceptor[])
.concat(
interceptorArgs.callInterceptors,
interceptorArgs.callInterceptorProviders.map(provider =>
interceptorArgs.callInterceptorProviders.map((provider) =>
provider(methodDefinition)
)
)
.filter(interceptor => interceptor);
.filter((interceptor) => interceptor);
// Filter out falsy values when providers return nothing
} else {
interceptors = ([] as Interceptor[])
.concat(
interceptorArgs.clientInterceptors,
interceptorArgs.clientInterceptorProviders.map(provider =>
interceptorArgs.clientInterceptorProviders.map((provider) =>
provider(methodDefinition)
)
)
.filter(interceptor => interceptor);
.filter((interceptor) => interceptor);
// Filter out falsy values when providers return nothing
}
const interceptorOptions = Object.assign({}, options, {
@ -548,14 +543,10 @@ export function getInterceptingCall(
* channel. */
const getCall: NextCall = interceptors.reduceRight<NextCall>(
(nextCall: NextCall, nextInterceptor: Interceptor) => {
return currentOptions => nextInterceptor(currentOptions, nextCall);
return (currentOptions) => nextInterceptor(currentOptions, nextCall);
},
(finalOptions: InterceptorOptions) =>
getBottomInterceptingCall(
channel,
finalOptions,
methodDefinition
)
getBottomInterceptingCall(channel, finalOptions, methodDefinition)
);
return getCall(interceptorOptions);
}

View File

@ -29,18 +29,13 @@ import {
SurfaceCall,
} from './call';
import { CallCredentials } from './call-credentials';
import {
Deadline,
StatusObject,
WriteObject,
InterceptingListener,
} from './call-stream';
import { Deadline, StatusObject } from './call-stream';
import { Channel, ConnectivityState, ChannelImplementation } from './channel';
import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options';
import { Status } from './constants';
import { Metadata } from './metadata';
import { ClientMethodDefinition, MethodDefinition } from './make-client';
import { ClientMethodDefinition } from './make-client';
import {
getInterceptingCall,
Interceptor,
@ -48,7 +43,12 @@ import {
InterceptorArguments,
InterceptingCallInterface,
} from './client-interceptors';
import { ServerUnaryCall, ServerReadableStream, ServerWritableStream, ServerDuplexStream } from './server-call';
import {
ServerUnaryCall,
ServerReadableStream,
ServerWritableStream,
ServerDuplexStream,
} from './server-call';
const CHANNEL_SYMBOL = Symbol();
const INTERCEPTOR_SYMBOL = Symbol();
@ -59,15 +59,21 @@ export interface UnaryCallback<ResponseType> {
(err: ServiceError | null, value?: ResponseType): void;
}
/* eslint-disable @typescript-eslint/no-explicit-any */
export interface CallOptions {
deadline?: Deadline;
host?: string;
parent?: ServerUnaryCall<any, any> | ServerReadableStream<any, any> | ServerWritableStream<any, any> | ServerDuplexStream<any, any>
parent?:
| ServerUnaryCall<any, any>
| ServerReadableStream<any, any>
| ServerWritableStream<any, any>
| ServerDuplexStream<any, any>;
propagate_flags?: number;
credentials?: CallCredentials;
interceptors?: Interceptor[];
interceptor_providers?: InterceptorProvider[];
}
/* eslint-enable @typescript-eslint/no-explicit-any */
export interface CallProperties<RequestType, ResponseType> {
argument?: RequestType;
@ -76,11 +82,11 @@ export interface CallProperties<RequestType, ResponseType> {
channel: Channel;
methodDefinition: ClientMethodDefinition<RequestType, ResponseType>;
callOptions: CallOptions;
callback?: UnaryCallback<ResponseType>
callback?: UnaryCallback<ResponseType>;
}
export interface CallInvocationTransformer {
(callProperties: CallProperties<any, any>): CallProperties<any, any>
(callProperties: CallProperties<any, any>): CallProperties<any, any>; // eslint-disable-line @typescript-eslint/no-explicit-any
}
export type ClientOptions = Partial<ChannelOptions> & {
@ -123,7 +129,8 @@ export class Client {
'to the client constructor. Only one of these is allowed.'
);
}
this[CALL_INVOCATION_TRANSFORMER_SYMBOL] = options.callInvocationTransformer;
this[CALL_INVOCATION_TRANSFORMER_SYMBOL] =
options.callInvocationTransformer;
delete options.callInvocationTransformer;
if (options.channelOverride) {
this[CHANNEL_SYMBOL] = options.channelOverride;
@ -274,17 +281,20 @@ export class Client {
channel: this[CHANNEL_SYMBOL],
methodDefinition: methodDefinition,
callOptions: checkedArguments.options,
callback: checkedArguments.callback
callback: checkedArguments.callback,
};
if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties<RequestType, ResponseType>;
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
callProperties
) as CallProperties<RequestType, ResponseType>;
}
const emitter: ClientUnaryCall = callProperties.call;
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: callProperties.callOptions.interceptors ?? [],
callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [],
callInterceptorProviders:
callProperties.callOptions.interceptor_providers ?? [],
};
const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs,
@ -303,12 +313,12 @@ export class Client {
let responseMessage: ResponseType | null = null;
let receivedStatus = false;
call.start(callProperties.metadata, {
onReceiveMetadata: metadata => {
onReceiveMetadata: (metadata) => {
emitter.emit('metadata', metadata);
},
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onReceiveMessage(message: any) {
if (responseMessage != null) {
if (responseMessage !== null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
}
responseMessage = message;
@ -386,17 +396,22 @@ export class Client {
channel: this[CHANNEL_SYMBOL],
methodDefinition: methodDefinition,
callOptions: checkedArguments.options,
callback: checkedArguments.callback
callback: checkedArguments.callback,
};
if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties<RequestType, ResponseType>;
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
callProperties
) as CallProperties<RequestType, ResponseType>;
}
const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<RequestType>;
const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<
RequestType
>;
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: callProperties.callOptions.interceptors ?? [],
callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [],
callInterceptorProviders:
callProperties.callOptions.interceptor_providers ?? [],
};
const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs,
@ -415,12 +430,12 @@ export class Client {
let responseMessage: ResponseType | null = null;
let receivedStatus = false;
call.start(callProperties.metadata, {
onReceiveMetadata: metadata => {
onReceiveMetadata: (metadata) => {
emitter.emit('metadata', metadata);
},
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onReceiveMessage(message: any) {
if (responseMessage != null) {
if (responseMessage !== null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
}
responseMessage = message;
@ -505,17 +520,22 @@ export class Client {
call: new ClientReadableStreamImpl<ResponseType>(deserialize),
channel: this[CHANNEL_SYMBOL],
methodDefinition: methodDefinition,
callOptions: checkedArguments.options
callOptions: checkedArguments.options,
};
if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties<RequestType, ResponseType>;
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
callProperties
) as CallProperties<RequestType, ResponseType>;
}
const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<ResponseType>;
const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<
ResponseType
>;
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: callProperties.callOptions.interceptors ?? [],
callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [],
callInterceptorProviders:
callProperties.callOptions.interceptor_providers ?? [],
};
const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs,
@ -536,7 +556,7 @@ export class Client {
onReceiveMetadata(metadata: Metadata) {
stream.emit('metadata', metadata);
},
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onReceiveMessage(message: any) {
if (stream.push(message)) {
call.startRead();
@ -592,20 +612,29 @@ export class Client {
};
let callProperties: CallProperties<RequestType, ResponseType> = {
metadata: checkedArguments.metadata,
call: new ClientDuplexStreamImpl<RequestType, ResponseType>(serialize, deserialize),
call: new ClientDuplexStreamImpl<RequestType, ResponseType>(
serialize,
deserialize
),
channel: this[CHANNEL_SYMBOL],
methodDefinition: methodDefinition,
callOptions: checkedArguments.options
callOptions: checkedArguments.options,
};
if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(callProperties) as CallProperties<RequestType, ResponseType>;
callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
callProperties
) as CallProperties<RequestType, ResponseType>;
}
const stream: ClientDuplexStream<RequestType, ResponseType> = callProperties.call as ClientDuplexStream<RequestType, ResponseType>;
const stream: ClientDuplexStream<
RequestType,
ResponseType
> = callProperties.call as ClientDuplexStream<RequestType, ResponseType>;
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: callProperties.callOptions.interceptors ?? [],
callInterceptorProviders: callProperties.callOptions.interceptor_providers ?? [],
callInterceptorProviders:
callProperties.callOptions.interceptor_providers ?? [],
};
const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs,

View File

@ -16,7 +16,7 @@
*/
import { Call, StatusObject } from './call-stream';
import { ConnectivityState, Channel } from './channel';
import { Channel } from './channel';
import { Status } from './constants';
import { BaseFilter, Filter, FilterFactory } from './filter';
import { Metadata } from './metadata';

View File

@ -78,7 +78,7 @@ export class FilterStackFactory implements FilterFactory<FilterStack> {
createFilter(callStream: Call): FilterStack {
return new FilterStack(
this.factories.map(factory => factory.createFilter(callStream))
this.factories.map((factory) => factory.createFilter(callStream))
);
}
}

View File

@ -15,14 +15,14 @@
*
*/
import { URL, parse } from "url";
import { log } from "./logging";
import { LogVerbosity } from "./constants";
import { parseTarget } from "./resolver-dns";
import { Socket } from "net";
import { URL } from 'url';
import { log } from './logging';
import { LogVerbosity } from './constants';
import { parseTarget } from './resolver-dns';
import { Socket } from 'net';
import * as http from 'http';
import * as logging from './logging';
import { SubchannelAddress, TcpSubchannelAddress, isTcpSubchannelAddress } from "./subchannel";
import { SubchannelAddress, isTcpSubchannelAddress } from './subchannel';
const TRACER_NAME = 'proxy';
@ -36,8 +36,8 @@ interface ProxyInfo {
}
function getProxyInfo(): ProxyInfo {
let proxyEnv: string = '';
let envVar: string = '';
let proxyEnv = '';
let envVar = '';
/* Prefer using 'grpc_proxy'. Fallback on 'http_proxy' if it is not set.
* Also prefer using 'https_proxy' with fallback on 'http_proxy'. The
* fallback behavior can be removed if there's a demand for it.
@ -62,7 +62,10 @@ function getProxyInfo(): ProxyInfo {
return {};
}
if (proxyUrl.protocol !== 'http:') {
log(LogVerbosity.ERROR, `"${proxyUrl.protocol}" scheme not supported in proxy URI`);
log(
LogVerbosity.ERROR,
`"${proxyUrl.protocol}" scheme not supported in proxy URI`
);
return {};
}
let userCred: string | null = null;
@ -75,12 +78,14 @@ function getProxyInfo(): ProxyInfo {
}
}
const result: ProxyInfo = {
address: proxyUrl.host
address: proxyUrl.host,
};
if (userCred) {
result.creds = userCred;
}
trace('Proxy server ' + result.address + ' set by environment variable ' + envVar);
trace(
'Proxy server ' + result.address + ' set by environment variable ' + envVar
);
return result;
}
@ -89,7 +94,7 @@ const PROXY_INFO = getProxyInfo();
function getNoProxyHostList(): string[] {
/* Prefer using 'no_grpc_proxy'. Fallback on 'no_proxy' if it is not set. */
let noProxyStr: string | undefined = process.env.no_grpc_proxy;
let envVar: string = 'no_grpc_proxy';
let envVar = 'no_grpc_proxy';
if (!noProxyStr) {
noProxyStr = process.env.no_proxy;
envVar = 'no_proxy';
@ -124,20 +129,37 @@ export function shouldUseProxy(target: string): boolean {
return true;
}
export function getProxiedConnection(target: string, subchannelAddress: SubchannelAddress): Promise<Socket> {
if (!(PROXY_INFO.address && shouldUseProxy(target) && isTcpSubchannelAddress(subchannelAddress))) {
export function getProxiedConnection(
target: string,
subchannelAddress: SubchannelAddress
): Promise<Socket> {
if (
!(
PROXY_INFO.address &&
shouldUseProxy(target) &&
isTcpSubchannelAddress(subchannelAddress)
)
) {
return Promise.reject<Socket>();
}
const subchannelAddressPathString = `${subchannelAddress.host}:${subchannelAddress.port}`;
trace('Using proxy ' + PROXY_INFO.address + ' to connect to ' + target + ' at ' + subchannelAddress);
trace(
'Using proxy ' +
PROXY_INFO.address +
' to connect to ' +
target +
' at ' +
subchannelAddress
);
const options: http.RequestOptions = {
method: 'CONNECT',
host: PROXY_INFO.address,
path: subchannelAddressPathString
path: subchannelAddressPathString,
};
if (PROXY_INFO.creds) {
options.headers = {
'Proxy-Authorization': 'Basic ' + Buffer.from(PROXY_INFO.creds).toString('base64')
'Proxy-Authorization':
'Basic ' + Buffer.from(PROXY_INFO.creds).toString('base64'),
};
}
return new Promise<Socket>((resolve, reject) => {
@ -146,16 +168,35 @@ export function getProxiedConnection(target: string, subchannelAddress: Subchann
request.removeAllListeners();
socket.removeAllListeners();
if (res.statusCode === 200) {
trace('Successfully connected to ' + subchannelAddress + ' through proxy ' + PROXY_INFO.address);
trace(
'Successfully connected to ' +
subchannelAddress +
' through proxy ' +
PROXY_INFO.address
);
resolve(socket);
} else {
log(LogVerbosity.ERROR, 'Failed to connect to ' + subchannelAddress + ' through proxy ' + PROXY_INFO.address);
log(
LogVerbosity.ERROR,
'Failed to connect to ' +
subchannelAddress +
' through proxy ' +
PROXY_INFO.address +
' with status ' +
res.statusCode
);
reject();
}
});
request.once('error', (err) => {
request.removeAllListeners();
log(LogVerbosity.ERROR, 'Failed to connect to proxy ' + PROXY_INFO.address);
log(
LogVerbosity.ERROR,
'Failed to connect to proxy ' +
PROXY_INFO.address +
' with error ' +
err.message
);
reject();
});
});

View File

@ -28,13 +28,19 @@ import { CallCredentials } from './call-credentials';
import { Deadline, StatusObject } from './call-stream';
import { Channel, ConnectivityState, ChannelImplementation } from './channel';
import { ChannelCredentials } from './channel-credentials';
import { CallOptions, Client, CallInvocationTransformer, CallProperties } from './client';
import {
CallOptions,
Client,
CallInvocationTransformer,
CallProperties,
} from './client';
import { LogVerbosity, Status } from './constants';
import * as logging from './logging';
import {
Deserialize,
loadPackageDefinition,
makeClientConstructor,
MethodDefinition,
Serialize,
ServiceDefinition,
} from './make-client';
@ -56,21 +62,22 @@ import {
ServerDuplexStream,
} from './server-call';
const supportedNodeVersions = require('../../package.json').engines.node;
import { engines as supportedEngines } from '../package.json';
const supportedNodeVersions = supportedEngines.node;
if (!semver.satisfies(process.version, supportedNodeVersions)) {
throw new Error(`@grpc/grpc-js only works on Node ${supportedNodeVersions}`);
}
interface IndexedObject {
[key: string]: any; // tslint:disable-line no-any
[key: number]: any; // tslint:disable-line no-any
[key: string]: any; // eslint-disable-line @typescript-eslint/no-explicit-any
[key: number]: any; // eslint-disable-line @typescript-eslint/no-explicit-any
}
function mixin(...sources: IndexedObject[]) {
const result: { [key: string]: Function } = {};
for (const source of sources) {
for (const propName of Object.getOwnPropertyNames(source)) {
const property: any = source[propName]; // tslint:disable-line no-any
const property: any = source[propName]; // eslint-disable-line @typescript-eslint/no-explicit-any
if (typeof property === 'function') {
result[propName] = property;
}
@ -129,14 +136,14 @@ export const credentials = mixin(
});
}
getHeaders.then(
headers => {
(headers) => {
const metadata = new Metadata();
for (const key of Object.keys(headers)) {
metadata.add(key, headers[key]);
}
callback(null, metadata);
},
err => {
(err) => {
callback(err);
}
);
@ -202,7 +209,7 @@ export {
CallProperties,
CallInvocationTransformer,
ChannelImplementation as Channel,
Channel as ChannelInterface
Channel as ChannelInterface,
};
/**
@ -230,6 +237,7 @@ export {
ClientWritableStream,
ClientDuplexStream,
CallOptions,
MethodDefinition,
StatusObject,
ServiceError,
ServerUnaryCall,
@ -245,17 +253,17 @@ export {
export { handleBidiStreamingCall, handleServerStreamingCall, handleUnaryCall };
/* tslint:disable:no-any */
/* eslint-disable @typescript-eslint/no-explicit-any */
export type Call =
| ClientUnaryCall
| ClientReadableStream<any>
| ClientWritableStream<any>
| ClientDuplexStream<any, any>;
/* tslint:enable:no-any */
/* eslint-enable @typescript-eslint/no-explicit-any */
/**** Unimplemented function stubs ****/
/* tslint:disable:no-any variable-name */
/* eslint-disable @typescript-eslint/no-explicit-any */
export const loadObject = (value: any, options: any) => {
throw new Error(

View File

@ -338,11 +338,11 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.resetSubchannelList();
trace(
'Connect to address list ' +
this.latestAddressList.map(address =>
this.latestAddressList.map((address) =>
subchannelAddressToString(address)
)
);
this.subchannels = this.latestAddressList.map(address =>
this.subchannels = this.latestAddressList.map((address) =>
this.channelControlHelper.createSubchannel(address, {})
);
for (const subchannel of this.subchannels) {

View File

@ -125,7 +125,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
private calculateAndUpdateState() {
if (this.subchannelStateCounts[ConnectivityState.READY] > 0) {
const readySubchannels = this.subchannels.filter(
subchannel =>
(subchannel) =>
subchannel.getConnectivityState() === ConnectivityState.READY
);
let index = 0;
@ -192,9 +192,9 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
this.resetSubchannelList();
trace(
'Connect to address list ' +
addressList.map(address => subchannelAddressToString(address))
addressList.map((address) => subchannelAddressToString(address))
);
this.subchannels = addressList.map(address =>
this.subchannels = addressList.map((address) =>
this.channelControlHelper.createSubchannel(address, {})
);
for (const subchannel of this.subchannels) {

View File

@ -23,9 +23,9 @@
/* The any type is purposely used here. All functions validate their input at
* runtime */
/* tslint:disable:no-any */
/* eslint-disable @typescript-eslint/no-explicit-any */
export interface RoundRobinConfig {}
export type RoundRobinConfig = {};
export interface XdsConfig {
balancerName: string;

View File

@ -48,7 +48,7 @@ export const setLoggerVerbosity = (verbosity: LogVerbosity): void => {
_logVerbosity = verbosity;
};
// tslint:disable-next-line no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const log = (severity: LogVerbosity, ...args: any[]): void => {
if (severity >= _logVerbosity && typeof _logger.error === 'function') {
_logger.error(...args);

View File

@ -18,6 +18,7 @@
import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options';
import { Client } from './client';
import { UntypedServiceImplementation } from './server';
export interface Serialize<T> {
(value: T): Buffer;
@ -49,10 +50,13 @@ export interface MethodDefinition<RequestType, ResponseType>
extends ClientMethodDefinition<RequestType, ResponseType>,
ServerMethodDefinition<RequestType, ResponseType> {}
export interface ServiceDefinition {
// tslint:disable-next-line no-any
[index: string]: MethodDefinition<any, any>;
}
/* eslint-disable @typescript-eslint/no-explicit-any */
export type ServiceDefinition<
ImplementationType = UntypedServiceImplementation
> = {
readonly [index in keyof ImplementationType]: MethodDefinition<any, any>;
};
/* eslint-enable @typescript-eslint/no-explicit-any */
export interface ProtobufTypeDefinition {
format: string;
@ -117,7 +121,7 @@ export function makeClientConstructor(
[methodName: string]: Function;
}
Object.keys(methods).forEach(name => {
Object.keys(methods).forEach((name) => {
const attrs = methods[name];
let methodType: keyof typeof requesterFuncs;
// TODO(murgatroid99): Verify that we don't need this anymore
@ -165,8 +169,8 @@ function partial(
serialize: Function,
deserialize: Function
): Function {
// tslint:disable-next-line:no-any
return function(this: any, ...args: any[]) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return function (this: any, ...args: any[]) {
return fn.call(this, path, serialize, deserialize, ...args);
};
}
@ -194,7 +198,7 @@ export function loadPackageDefinition(
): GrpcObject {
const result: GrpcObject = {};
for (const serviceFqn in packageDef) {
if (packageDef.hasOwnProperty(serviceFqn)) {
if (Object.prototype.hasOwnProperty.call(packageDef, serviceFqn)) {
const service = packageDef[serviceFqn];
const nameComponents = serviceFqn.split('.');
const serviceName = nameComponents[nameComponents.length - 1];

View File

@ -48,7 +48,7 @@ function validate(key: string, value?: MetadataValue): void {
if (!isLegalKey(key)) {
throw new Error('Metadata key "' + key + '" contains illegal characters');
}
if (value != null) {
if (value !== null && value !== undefined) {
if (isBinaryKey(key)) {
if (!(value instanceof Buffer)) {
throw new Error("keys that end with '-bin' must have Buffer values");
@ -178,7 +178,7 @@ export class Metadata {
const newInternalRepr = newMetadata.internalRepr;
this.internalRepr.forEach((value, key) => {
const clonedValue: MetadataValue[] = value.map(v => {
const clonedValue: MetadataValue[] = value.map((v) => {
if (v instanceof Buffer) {
return Buffer.from(v);
} else {
@ -226,7 +226,7 @@ export class Metadata {
this.internalRepr.forEach((values, key) => {
// We assume that the user's interaction with this object is limited to
// through its public API (i.e. keys and values are already validated).
result[key] = values.map(value => {
result[key] = values.map((value) => {
if (value instanceof Buffer) {
return value.toString('base64');
} else {
@ -249,7 +249,7 @@ export class Metadata {
*/
static fromHttp2Headers(headers: http2.IncomingHttpHeaders): Metadata {
const result = new Metadata();
Object.keys(headers).forEach(key => {
Object.keys(headers).forEach((key) => {
// Reserved headers (beginning with `:`) are not valid keys.
if (key.charAt(0) === ':') {
return;
@ -260,12 +260,12 @@ export class Metadata {
try {
if (isBinaryKey(key)) {
if (Array.isArray(values)) {
values.forEach(value => {
values.forEach((value) => {
result.add(key, Buffer.from(value, 'base64'));
});
} else if (values !== undefined) {
if (isCustomMetadata(key)) {
values.split(',').forEach(v => {
values.split(',').forEach((v) => {
result.add(key, Buffer.from(v.trim(), 'base64'));
});
} else {
@ -274,12 +274,12 @@ export class Metadata {
}
} else {
if (Array.isArray(values)) {
values.forEach(value => {
values.forEach((value) => {
result.add(key, value);
});
} else if (values !== undefined) {
if (isCustomMetadata(key)) {
values.split(',').forEach(v => result.add(key, v.trim()));
values.split(',').forEach((v) => result.add(key, v.trim()));
} else {
result.add(key, values);
}

View File

@ -18,7 +18,7 @@
import { Duplex, Readable, Writable } from 'stream';
import { EmitterAugmentation1 } from './events';
// tslint:disable:no-any
/* eslint-disable @typescript-eslint/no-explicit-any */
export type WriteCallback = (error: Error | null | undefined) => void;

View File

@ -23,7 +23,6 @@ import {
import * as dns from 'dns';
import * as util from 'util';
import { extractAndSelectServiceConfig, ServiceConfig } from './service-config';
import { ServiceError } from './call';
import { Status } from './constants';
import { StatusObject } from './call-stream';
import { Metadata } from './metadata';
@ -109,7 +108,7 @@ function mergeArrays<T>(...arrays: T[][]): T[] {
i <
Math.max.apply(
null,
arrays.map(array => array.length)
arrays.map((array) => array.length)
);
i++
) {
@ -186,50 +185,56 @@ class DnsResolver implements Resolver {
* if the name exists but there are no records for that family, and that
* error is indistinguishable from other kinds of errors */
this.pendingLookupPromise = dnsLookupPromise(hostname, { all: true });
this.pendingLookupPromise.then(addressList => {
this.pendingLookupPromise = null;
const ip4Addresses: dns.LookupAddress[] = addressList.filter(
addr => addr.family === 4
);
const ip6Addresses: dns.LookupAddress[] = addressList.filter(addr => addr.family === 6);
this.latestLookupResult = mergeArrays(
ip6Addresses,
ip4Addresses
).map(addr => ({ host: addr.address, port: +this.port! }));
const allAddressesString: string =
'[' +
this.latestLookupResult.map(addr => addr.host + ':' + addr.port).join(',') +
']';
trace(
'Resolved addresses for target ' +
this.target +
': ' +
allAddressesString
);
if (this.latestLookupResult.length === 0) {
this.pendingLookupPromise.then(
(addressList) => {
this.pendingLookupPromise = null;
const ip4Addresses: dns.LookupAddress[] = addressList.filter(
(addr) => addr.family === 4
);
const ip6Addresses: dns.LookupAddress[] = addressList.filter(
(addr) => addr.family === 6
);
this.latestLookupResult = mergeArrays(
ip6Addresses,
ip4Addresses
).map((addr) => ({ host: addr.address, port: +this.port! }));
const allAddressesString: string =
'[' +
this.latestLookupResult
.map((addr) => addr.host + ':' + addr.port)
.join(',') +
']';
trace(
'Resolved addresses for target ' +
this.target +
': ' +
allAddressesString
);
if (this.latestLookupResult.length === 0) {
this.listener.onError(this.defaultResolutionError);
return;
}
/* If the TXT lookup has not yet finished, both of the last two
* arguments will be null, which is the equivalent of getting an
* empty TXT response. When the TXT lookup does finish, its handler
* can update the service config by using the same address list */
this.listener.onSuccessfulResolution(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError
);
},
(err) => {
trace(
'Resolution error for target ' +
this.target +
': ' +
(err as Error).message
);
this.pendingLookupPromise = null;
this.listener.onError(this.defaultResolutionError);
return;
}
/* If the TXT lookup has not yet finished, both of the last two
* arguments will be null, which is the equivalent of getting an
* empty TXT response. When the TXT lookup does finish, its handler
* can update the service config by using the same address list */
this.listener.onSuccessfulResolution(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError
);
},
err => {
trace(
'Resolution error for target ' +
this.target +
': ' +
(err as Error).message
);
this.pendingLookupPromise = null;
this.listener.onError(this.defaultResolutionError);
});
);
/* If there already is a still-pending TXT resolution, we can just use
* that result when it comes in */
if (this.pendingTxtPromise === null) {
@ -237,45 +242,48 @@ class DnsResolver implements Resolver {
* the name resolution attempt as a whole is a success even if the TXT
* lookup fails */
this.pendingTxtPromise = resolveTxtPromise(hostname);
this.pendingTxtPromise.then(txtRecord => {
this.pendingTxtPromise = null;
try {
this.latestServiceConfig = extractAndSelectServiceConfig(
txtRecord,
this.percentage
);
} catch (err) {
this.pendingTxtPromise.then(
(txtRecord) => {
this.pendingTxtPromise = null;
try {
this.latestServiceConfig = extractAndSelectServiceConfig(
txtRecord,
this.percentage
);
} catch (err) {
this.latestServiceConfigError = {
code: Status.UNAVAILABLE,
details: 'Parsing service config failed',
metadata: new Metadata(),
};
}
if (this.latestLookupResult !== null) {
/* We rely here on the assumption that calling this function with
* identical parameters will be essentialy idempotent, and calling
* it with the same address list and a different service config
* should result in a fast and seamless switchover. */
this.listener.onSuccessfulResolution(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError
);
}
},
(err) => {
this.latestServiceConfigError = {
code: Status.UNAVAILABLE,
details: 'Parsing service config failed',
details: 'TXT query failed',
metadata: new Metadata(),
};
if (this.latestLookupResult !== null) {
this.listener.onSuccessfulResolution(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError
);
}
}
if (this.latestLookupResult !== null) {
/* We rely here on the assumption that calling this function with
* identical parameters will be essentialy idempotent, and calling
* it with the same address list and a different service config
* should result in a fast and seamless switchover. */
this.listener.onSuccessfulResolution(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError
)
}
}, err => {
this.latestServiceConfigError = {
code: Status.UNAVAILABLE,
details: 'TXT query failed',
metadata: new Metadata(),
};
if (this.latestLookupResult !== null) {
this.listener.onSuccessfulResolution(
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError
)
}
});
);
}
}
}
@ -298,16 +306,10 @@ class DnsResolver implements Resolver {
IPV6_REGEX.exec(target) ||
IPV6_BRACKET_REGEX.exec(target);
if (ipMatch) {
if (ipMatch[2]) {
return ipMatch[1] + ':' + ipMatch[2];
}
return ipMatch[1];
}
const dnsMatch = DNS_REGEX.exec(target);
if (dnsMatch) {
if (dnsMatch[2]) {
return dnsMatch[1] + ':' + dnsMatch[2];
}
return dnsMatch[1];
}
throw new Error(`Failed to parse target ${target}`);
@ -323,17 +325,21 @@ export function setup(): void {
registerDefaultResolver(DnsResolver);
}
export interface dnsUrl {
export interface DnsUrl {
host: string;
port?: string;
}
export function parseTarget(target: string): dnsUrl | null {
const match = IPV4_REGEX.exec(target) ?? IPV6_REGEX.exec(target) ?? IPV6_BRACKET_REGEX.exec(target) ?? DNS_REGEX.exec(target)
export function parseTarget(target: string): DnsUrl | null {
const match =
IPV4_REGEX.exec(target) ??
IPV6_REGEX.exec(target) ??
IPV6_BRACKET_REGEX.exec(target) ??
DNS_REGEX.exec(target);
if (match) {
return {
host: match[1],
port: match[2] ?? undefined
port: match[2] ?? undefined,
};
} else {
return null;

View File

@ -15,7 +15,6 @@
*
*/
import { ServiceError } from './call';
import { ServiceConfig } from './service-config';
import * as resolver_dns from './resolver-dns';
import * as resolver_uds from './resolver-uds';

View File

@ -158,7 +158,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
this.trailingMetadata = new Metadata();
this.call.setupSurfaceCall(this);
this.on('error', err => {
this.on('error', (err) => {
this.call.sendError(err);
this.end();
});
@ -175,7 +175,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
async _write(
chunk: ResponseType,
encoding: string,
// tslint:disable-next-line:no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
callback: (...args: any[]) => void
) {
try {
@ -202,7 +202,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
callback(null);
}
// tslint:disable-next-line:no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
end(metadata?: any) {
if (metadata) {
this.trailingMetadata = metadata;
@ -229,7 +229,7 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
this.call.setupSurfaceCall(this);
this.call.setupReadable(this);
this.on('error', err => {
this.on('error', (err) => {
this.call.sendError(err);
this.end();
});
@ -497,7 +497,7 @@ export class Http2ServerCallStream<
}
if (err) {
if (!err.hasOwnProperty('metadata')) {
if (!Object.prototype.hasOwnProperty.call(err, 'metadata')) {
err.metadata = metadata;
}
this.sendError(err);
@ -590,7 +590,7 @@ export class Http2ServerCallStream<
}
setupSurfaceCall(call: ServerSurfaceCall) {
this.once('cancelled', reason => {
this.once('cancelled', (reason) => {
call.cancelled = true;
call.emit('cancelled', reason);
});
@ -704,7 +704,7 @@ export class Http2ServerCallStream<
}
}
// tslint:disable:no-any
/* eslint-disable @typescript-eslint/no-explicit-any */
type UntypedServerCall = Http2ServerCallStream<any, any>;
function handleExpiredDeadline(call: UntypedServerCall) {

View File

@ -16,8 +16,7 @@
*/
import * as http2 from 'http2';
import { AddressInfo, ListenOptions } from 'net';
import { URL } from 'url';
import { AddressInfo } from 'net';
import { ServiceError } from './call';
import { Status, LogVerbosity } from './constants';
@ -48,7 +47,11 @@ import { ServerCredentials } from './server-credentials';
import { ChannelOptions } from './channel-options';
import { createResolver, ResolverListener } from './resolver';
import { log } from './logging';
import { SubchannelAddress, TcpSubchannelAddress, isTcpSubchannelAddress } from './subchannel';
import {
SubchannelAddress,
TcpSubchannelAddress,
isTcpSubchannelAddress,
} from './subchannel';
interface BindResult {
port: number;
@ -67,7 +70,7 @@ function getUnimplementedStatusResponse(
};
}
// tslint:disable:no-any
/* eslint-disable @typescript-eslint/no-explicit-any */
type UntypedUnaryHandler = UnaryHandler<any, any>;
type UntypedClientStreamingHandler = ClientStreamingHandler<any, any>;
type UntypedServerStreamingHandler = ServerStreamingHandler<any, any>;
@ -109,11 +112,10 @@ function getDefaultHandler(handlerType: HandlerType, methodName: string) {
throw new Error(`Invalid handlerType ${handlerType}`);
}
}
// tslint:enable:no-any
export class Server {
private http2ServerList: (http2.Http2Server | http2.Http2SecureServer)[] = [];
private handlers: Map<string, UntypedHandler> = new Map<
string,
UntypedHandler
@ -153,7 +155,7 @@ export class Server {
throw new Error('Cannot add an empty service to a server');
}
serviceKeys.forEach(name => {
serviceKeys.forEach((name) => {
const attrs = service[name];
let methodType: HandlerType;
@ -245,62 +247,72 @@ export class Server {
http2Server.setTimeout(0, noop);
this._setupHandlers(http2Server);
return http2Server;
}
};
const bindSpecificPort = (addressList: SubchannelAddress[], portNum: number, previousCount: number): Promise<BindResult> => {
const bindSpecificPort = (
addressList: SubchannelAddress[],
portNum: number,
previousCount: number
): Promise<BindResult> => {
if (addressList.length === 0) {
return Promise.resolve({port: portNum, count: previousCount});
return Promise.resolve({ port: portNum, count: previousCount });
}
return Promise.all(addressList.map(address => {
let addr: SubchannelAddress;
if (isTcpSubchannelAddress(address)) {
addr = {
host: (address as TcpSubchannelAddress).host,
port: portNum
};
} else {
addr = address
}
const http2Server = setupServer();
return new Promise<number|Error>((resolve, reject) => {
function onError(err: Error): void {
resolve(err);
return Promise.all(
addressList.map((address) => {
let addr: SubchannelAddress;
if (isTcpSubchannelAddress(address)) {
addr = {
host: (address as TcpSubchannelAddress).host,
port: portNum,
};
} else {
addr = address;
}
http2Server.once('error', onError);
http2Server.listen(addr, () => {
this.http2ServerList.push(http2Server);
const boundAddress = http2Server.address()!;
if (typeof boundAddress === 'string') {
resolve(portNum);
} else {
resolve(boundAddress.port);
const http2Server = setupServer();
return new Promise<number | Error>((resolve, reject) => {
function onError(err: Error): void {
resolve(err);
}
http2Server.removeListener('error', onError);
http2Server.once('error', onError);
http2Server.listen(addr, () => {
this.http2ServerList.push(http2Server);
const boundAddress = http2Server.address()!;
if (typeof boundAddress === 'string') {
resolve(portNum);
} else {
resolve(boundAddress.port);
}
http2Server.removeListener('error', onError);
});
});
})
})).then(results => {
).then((results) => {
let count = 0;
for (const result of results) {
if (typeof result === 'number') {
count += 1;
if (result !== portNum) {
throw new Error('Invalid state: multiple port numbers added from single address');
throw new Error(
'Invalid state: multiple port numbers added from single address'
);
}
}
}
return {
port: portNum,
count: count + previousCount
count: count + previousCount,
};
});
}
};
const bindWildcardPort = (addressList: SubchannelAddress[]): Promise<BindResult> => {
const bindWildcardPort = (
addressList: SubchannelAddress[]
): Promise<BindResult> => {
if (addressList.length === 0) {
return Promise.resolve<BindResult>({port: 0, count: 0});
return Promise.resolve<BindResult>({ port: 0, count: 0 });
}
const address = addressList[0];
const http2Server = setupServer();
@ -313,16 +325,26 @@ export class Server {
http2Server.listen(address, () => {
this.http2ServerList.push(http2Server);
resolve(bindSpecificPort(addressList.slice(1), (http2Server.address() as AddressInfo).port, 1));
resolve(
bindSpecificPort(
addressList.slice(1),
(http2Server.address() as AddressInfo).port,
1
)
);
http2Server.removeListener('error', onError);
});
});
}
};
const resolverListener: ResolverListener = {
onSuccessfulResolution: (addressList, serviceConfig, serviceConfigError) => {
onSuccessfulResolution: (
addressList,
serviceConfig,
serviceConfigError
) => {
// We only want one resolution result. Discard all future results
resolverListener.onSuccessfulResolution = () => {}
resolverListener.onSuccessfulResolution = () => {};
if (addressList.length === 0) {
callback(new Error(`No addresses resolved for port ${port}`), 0);
return;
@ -332,32 +354,42 @@ export class Server {
if (addressList[0].port === 0) {
bindResultPromise = bindWildcardPort(addressList);
} else {
bindResultPromise = bindSpecificPort(addressList, addressList[0].port, 0);
bindResultPromise = bindSpecificPort(
addressList,
addressList[0].port,
0
);
}
} else{
} else {
// Use an arbitrary non-zero port for non-TCP addresses
bindResultPromise = bindSpecificPort(addressList, 1, 0);
}
bindResultPromise.then(bindResult => {
if (bindResult.count === 0) {
bindResultPromise.then(
(bindResult) => {
if (bindResult.count === 0) {
const errorString = `No address added out of total ${addressList.length} resolved`;
log(LogVerbosity.ERROR, errorString);
callback(new Error(errorString), 0);
} else {
if (bindResult.count < addressList.length) {
log(
LogVerbosity.INFO,
`WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
);
}
callback(null, bindResult.port);
}
},
(error) => {
const errorString = `No address added out of total ${addressList.length} resolved`;
log(LogVerbosity.ERROR, errorString);
callback(new Error(errorString), 0);
} else {
if (bindResult.count < addressList.length) {
log(LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`);
}
callback(null, bindResult.port);
}
}, (error) => {
const errorString = `No address added out of total ${addressList.length} resolved`;
log(LogVerbosity.ERROR, errorString);
callback(new Error(errorString), 0);
});
);
},
onError: (error) => {
callback(new Error(error.details), 0);
}
},
};
const resolver = createResolver(port, resolverListener);
@ -377,10 +409,10 @@ export class Server {
// Always destroy any available sessions. It's possible that one or more
// tryShutdown() calls are in progress. Don't wait on them to finish.
this.sessions.forEach(session => {
this.sessions.forEach((session) => {
// Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
// recognize destroy(code) as a valid signature.
// tslint:disable-next-line:no-any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
});
this.sessions.clear();
@ -407,7 +439,12 @@ export class Server {
}
start(): void {
if (this.http2ServerList.length === 0 || this.http2ServerList.every(http2Server => http2Server.listening !== true)) {
if (
this.http2ServerList.length === 0 ||
this.http2ServerList.every(
(http2Server) => http2Server.listening !== true
)
) {
throw new Error('server must be bound in order to start');
}
@ -441,7 +478,7 @@ export class Server {
// If any sessions are active, close them gracefully.
pendingChecks += this.sessions.size;
this.sessions.forEach(session => {
this.sessions.forEach((session) => {
session.close(maybeCallback);
});
if (pendingChecks === 0) {
@ -453,7 +490,9 @@ export class Server {
throw new Error('Not yet implemented');
}
private _setupHandlers(http2Server: http2.Http2Server | http2.Http2SecureServer): void {
private _setupHandlers(
http2Server: http2.Http2Server | http2.Http2SecureServer
): void {
if (http2Server === null) {
return;
}
@ -527,7 +566,7 @@ export class Server {
}
);
http2Server.on('session', session => {
http2Server.on('session', (session) => {
if (!this.started) {
session.destroy();
return;

View File

@ -24,7 +24,7 @@
/* The any type is purposely used here. All functions validate their input at
* runtime */
/* tslint:disable:no-any */
/* eslint-disable @typescript-eslint/no-explicit-any */
import * as lbconfig from './load-balancing-config';
import * as os from 'os';

View File

@ -63,12 +63,12 @@ export class SubchannelPool {
/* These objects are created with Object.create(null), so they do not
* have a prototype, which means that for (... in ...) loops over them
* do not need to be filtered */
// tslint:disable-next-line:forin
// eslint-disable-disable-next-line:forin
for (const channelTarget in this.pool) {
const subchannelObjArray = this.pool[channelTarget];
const refedSubchannels = subchannelObjArray.filter(
value => !value.subchannel.unrefIfOneRef()
(value) => !value.subchannel.unrefIfOneRef()
);
if (refedSubchannels.length > 0) {

View File

@ -20,7 +20,7 @@ import { ChannelCredentials } from './channel-credentials';
import { Metadata } from './metadata';
import { Http2CallStream } from './call-stream';
import { ChannelOptions } from './channel-options';
import { PeerCertificate, checkServerIdentity, TLSSocket } from 'tls';
import { PeerCertificate, checkServerIdentity } from 'tls';
import { ConnectivityState } from './channel';
import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
import { getDefaultAuthority } from './resolver';
@ -29,7 +29,7 @@ import { LogVerbosity } from './constants';
import { shouldUseProxy, getProxiedConnection } from './http_proxy';
import * as net from 'net';
const { version: clientVersion } = require('../../package.json');
import { version as clientVersion } from '../package.json';
const TRACER_NAME = 'subchannel';
@ -210,7 +210,7 @@ export class Subchannel {
`grpc-node-js/${clientVersion}`,
options['grpc.secondary_user_agent'],
]
.filter(e => e)
.filter((e) => e)
.join(' '); // remove falsey values first
if ('grpc.keepalive_time_ms' in options) {
@ -311,8 +311,8 @@ export class Subchannel {
return socket;
} else {
/* net.NetConnectOpts is declared in a way that is more restrictive
* than what net.connect will actually accept, so we use the type
* assertion to work around that. */
* than what net.connect will actually accept, so we use the type
* assertion to work around that. */
return net.connect(this.subchannelAddress);
}
};
@ -397,7 +397,7 @@ export class Subchannel {
}
}
);
session.once('error', error => {
session.once('error', (error) => {
/* Do nothing here. Any error should also trigger a close event, which is
* where we want to handle that. */
trace(
@ -410,11 +410,17 @@ export class Subchannel {
private startConnectingInternal() {
if (shouldUseProxy(this.channelTarget)) {
getProxiedConnection(this.channelTarget, this.subchannelAddress).then((socket) => {
this.createSession(socket);
}, (reason) => {
this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.TRANSIENT_FAILURE);
});
getProxiedConnection(this.channelTarget, this.subchannelAddress).then(
(socket) => {
this.createSession(socket);
},
(reason) => {
this.transitionToState(
[ConnectivityState.CONNECTING],
ConnectivityState.TRANSIENT_FAILURE
);
}
);
} else {
this.createSession();
}
@ -589,7 +595,7 @@ export class Subchannel {
const http2Stream = this.session!.request(headers);
let headersString = '';
for (const header of Object.keys(headers)) {
headersString += '\t\t' + header + ': ' + headers[header] + '\n'
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
}
trace('Starting stream with headers\n' + headersString);
callStream.attachHttp2Stream(http2Stream, this);

View File

@ -32,7 +32,6 @@ export function mockFunction(): never {
throw new Error('Not implemented');
}
// tslint:disable-next-line:no-namespace
export namespace assert2 {
const toCall = new Map<() => void, number>();
const afterCallsQueue: Array<() => void> = [];
@ -96,7 +95,6 @@ export namespace assert2 {
return result;
};
}
// tslint:enable:no-any
/**
* Calls the given function when every function that was wrapped with

View File

@ -5,6 +5,7 @@
"outDir": "build",
"target": "es2017",
"module": "commonjs",
"resolveJsonModule": true,
"incremental": true
},
"include": [

View File

@ -1,8 +0,0 @@
{
"extends": "gts/tslint.json",
"linterOptions": {
"exclude": [
"**/*.json"
]
}
}