Merge branch 'master' into grpc-js-xds_http_filters

This commit is contained in:
murgatroid99 2021-07-15 16:36:27 -07:00
commit ae2cb672b2
51 changed files with 958 additions and 271 deletions

38
TROUBLESHOOTING.md Normal file
View File

@ -0,0 +1,38 @@
# Troubleshooting grpc-js
This guide is for troubleshooting the `grpc-js` library for Node.js
## Enabling extra logging and tracing
Extra logging can be very useful for diagnosing problems. `grpc-js` supporst
the `GRPC_VERBOSITY` and `GRPC_TRACE` environment variables that can be used to increase the amount of information
that gets printed to stderr.
## GRPC_VERBOSITY
`GRPC_VERBOSITY` is used to set the minimum level of log messages printed by gRPC (supported values are `DEBUG`, `INFO` and `ERROR`). If this environment variable is unset, only `ERROR` logs will be printed.
## GRPC_TRACE
`GRPC_TRACE` can be used to enable extra logging for some internal gRPC components. Enabling the right traces can be invaluable
for diagnosing for what is going wrong when things aren't working as intended. Possible values for `GRPC_TRACE` are listed in [Environment Variables Overview](doc/environment_variables.md).
Multiple traces can be enabled at once (use comma as separator).
```
# Enable debug logs for an application
GRPC_VERBOSITY=debug ./helloworld_application_using_grpc
```
```
# Print information about channel state changes
GRPC_VERBOSITY=debug GRPC_TRACE=connectivity_state ./helloworld_application_using_grpc
```
```
# Print info from 3 different tracers, including tracing logs with log level DEBUG
GRPC_VERBOSITY=debug GRPC_TRACE=channel,subchannel,call_stream ./helloworld_application_using_grpc
```
Please note that the `GRPC_TRACE` environment variable has nothing to do with gRPC's "tracing" feature (= tracing RPCs in
microservice environment to gain insight about how requests are processed by deployment), it is merely used to enable printing
of extra logs.

View File

@ -0,0 +1,58 @@
# grpc-js environment variables
`@grpc/grpc-js` exposes some configuration as environment variables that
can be set.
*For the legacy `grpc` library, the environment variables are documented
[in the main gRPC repository](https://github.com/grpc/grpc/blob/master/doc/environment_variables.md)*
* grpc_proxy, https_proxy, http_proxy
The URI of the proxy to use for HTTP CONNECT support. These variables are
checked in order, and the first one that has a value is used.
* no_grpc_proxy, no_proxy
A comma separated list of hostnames to connect to without using a proxy even
if a proxy is set. These variables are checked in order, and the first one
that has a value is used.
* GRPC_SSL_CIPHER_SUITES
A colon separated list of cipher suites to use with OpenSSL
Defaults to the defaults for Node.js
* GRPC_DEFAULT_SSL_ROOTS_FILE_PATH
PEM file to load SSL roots from
* GRPC_NODE_TRACE, GRPC_TRACE
A comma separated list of tracers that provide additional insight into how
grpc-js is processing requests via debug logs. Available tracers include:
- `call_stream` - Traces client request internals
- `channel` - Traces channel events
- `connectivity_state` - Traces channel connectivity state changes
- `dns_resolver` - Traces DNS resolution
- `pick_first` - Traces the pick first load balancing policy
- `proxy` - Traces proxy operations
- `resolving_load_balancer` - Traces the resolving load balancer
- `round_robin` - Traces the round robin load balancing policy
- `server` - Traces high-level server events
- `server_call` - Traces server handling of individual requests
- `subchannel` - Traces subchannel connectivity state and errors
- `subchannel_refcount` - Traces subchannel refcount changes
The following tracers are added by the `@grpc/grpc-js-xds` library:
- `cds_balancer` - Traces the CDS load balancing policy
- `eds_balancer` - Traces the EDS load balancing policy
- `priority` - Traces the priority load balancing policy
- `weighted_target` - Traces the weighted target load balancing policy
- `xds_client` - Traces the xDS Client
- `xds_cluster_manager` - Traces the xDS cluster manager load balancing policy
- `xds_resolver` - Traces the xDS name resolver
'all' can additionally be used to turn all traces on.
Individual traces can be disabled by prefixing them with '-'.
* GRPC_NODE_VERBOSITY, GRPC_VERBOSITY
Default gRPC logging verbosity - one of:
- DEBUG - log all gRPC messages
- INFO - log INFO and ERROR message
- ERROR - log only errors (default)
- NONE - won't log any

View File

@ -21,4 +21,5 @@ const client = new MyServiceClient('xds:///example.com:123');
## Supported Features
- [xDS-Based Global Load Balancing](https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md)
- [xDS-Based Global Load Balancing](https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md)
- [xDS traffic splitting and routing](https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md)

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js-xds",
"version": "1.2.4",
"version": "1.3.1",
"description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.",
"main": "build/src/index.js",
"scripts": {
@ -47,7 +47,7 @@
"re2-wasm": "^1.0.1"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.2.10"
"@grpc/grpc-js": "~1.3.0"
},
"engines": {
"node": ">=10.10.0"

View File

@ -197,8 +197,8 @@ export class PathExactValueMatcher {
export class PathSafeRegexValueMatcher {
private targetRegexImpl: RE2;
constructor(targetRegex: string, caseInsensitive: boolean) {
this.targetRegexImpl = new RE2(`^${targetRegex}$`, caseInsensitive ? 'iu' : 'u');
constructor(targetRegex: string) {
this.targetRegexImpl = new RE2(`^${targetRegex}$`, 'u');
}
apply(value: string) {

View File

@ -177,7 +177,7 @@ function getPredicateForMatcher(routeMatch: RouteMatch__Output): Matcher {
pathMatcher = new PathExactValueMatcher(routeMatch.path!, caseInsensitive);
break;
case 'safe_regex':
pathMatcher = new PathSafeRegexValueMatcher(routeMatch.safe_regex!.regex, caseInsensitive);
pathMatcher = new PathSafeRegexValueMatcher(routeMatch.safe_regex!.regex);
break;
default:
pathMatcher = new RejectValueMatcher();

View File

@ -25,6 +25,8 @@ Documentation specifically for the `@grpc/grpc-js` package is currently not avai
- Connection Keepalives
- HTTP Connect support (proxies)
If you need a feature from the `grpc` package that is not provided by the `@grpc/grpc-js`, please file a feature request with that information.
This library does not directly handle `.proto` files. To use `.proto` files with this library we recommend using the `@grpc/proto-loader` package.
## Migrating from [`grpc`](https://www.npmjs.com/package/grpc)

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.2.12",
"version": "1.3.5",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
@ -29,6 +29,7 @@
"gulp": "^4.0.2",
"gulp-mocha": "^6.0.0",
"lodash": "^4.17.4",
"madge": "^5.0.1",
"mocha-jenkins-reporter": "^0.4.1",
"ncp": "^2.0.0",
"pify": "^4.0.1",
@ -53,7 +54,7 @@
"check": "gts check src/**/*.ts",
"fix": "gts fix src/*.ts",
"pretest": "npm run compile",
"posttest": "npm run check"
"posttest": "npm run check && madge -c ./build/src"
},
"dependencies": {
"@types/node": ">=12.12.47"

View File

@ -66,7 +66,9 @@ export class CallCredentialsFilter extends BaseFilter implements Filter {
Status.INTERNAL,
'"authorization" metadata cannot have multiple values'
);
return Promise.reject<Metadata>('"authorization" metadata cannot have multiple values');
return Promise.reject<Metadata>(
'"authorization" metadata cannot have multiple values'
);
}
return resultMetadata;
}

View File

@ -57,7 +57,7 @@ interface SystemError extends Error {
* Should do approximately the same thing as util.getSystemErrorName but the
* TypeScript types don't have that function for some reason so I just made my
* own.
* @param errno
* @param errno
*/
function getSystemErrorName(errno: number): string {
for (const [name, num] of Object.entries(os.constants.errno)) {
@ -71,9 +71,10 @@ function getSystemErrorName(errno: number): string {
export type Deadline = Date | number;
function getMinDeadline(deadlineList: Deadline[]): Deadline {
let minValue: number = Infinity;
let minValue = Infinity;
for (const deadline of deadlineList) {
const deadlineMsecs = deadline instanceof Date ? deadline.getTime() : deadline;
const deadlineMsecs =
deadline instanceof Date ? deadline.getTime() : deadline;
if (deadlineMsecs < minValue) {
minValue = deadlineMsecs;
}
@ -265,7 +266,10 @@ export class Http2CallStream implements Call {
metadata: new Metadata(),
});
};
if (this.options.parentCall && this.options.flags & Propagate.CANCELLATION) {
if (
this.options.parentCall &&
this.options.flags & Propagate.CANCELLATION
) {
this.options.parentCall.on('cancelled', () => {
this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
});
@ -548,7 +552,7 @@ export class Http2CallStream implements Call {
stream.on('close', () => {
/* Use process.next tick to ensure that this code happens after any
* "error" event that may be emitted at about the same time, so that
* we can bubble up the error message from that event. */
* we can bubble up the error message from that event. */
process.nextTick(() => {
this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
/* If we have a final status with an OK status code, that means that
@ -597,7 +601,7 @@ export class Http2CallStream implements Call {
* "Internal server error" message. */
details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
} else {
if (this.internalError.errno === os.constants.errno.ECONNRESET) {
if (this.internalError.code === 'ECONNRESET') {
code = Status.UNAVAILABLE;
details = this.internalError.message;
} else {
@ -629,7 +633,16 @@ export class Http2CallStream implements Call {
* https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
*/
if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
this.trace('Node error event: message=' + err.message + ' code=' + err.code + ' errno=' + getSystemErrorName(err.errno) + ' syscall=' + err.syscall);
this.trace(
'Node error event: message=' +
err.message +
' code=' +
err.code +
' errno=' +
getSystemErrorName(err.errno) +
' syscall=' +
err.syscall
);
this.internalError = err;
}
});

View File

@ -81,7 +81,8 @@ export function callErrorFromStatus(status: StatusObject): ServiceError {
return Object.assign(new Error(message), status);
}
export class ClientUnaryCallImpl extends EventEmitter
export class ClientUnaryCallImpl
extends EventEmitter
implements ClientUnaryCall {
public call?: InterceptingCallInterface;
constructor() {
@ -97,7 +98,8 @@ export class ClientUnaryCallImpl extends EventEmitter
}
}
export class ClientReadableStreamImpl<ResponseType> extends Readable
export class ClientReadableStreamImpl<ResponseType>
extends Readable
implements ClientReadableStream<ResponseType> {
public call?: InterceptingCallInterface;
constructor(readonly deserialize: (chunk: Buffer) => ResponseType) {
@ -117,7 +119,8 @@ export class ClientReadableStreamImpl<ResponseType> extends Readable
}
}
export class ClientWritableStreamImpl<RequestType> extends Writable
export class ClientWritableStreamImpl<RequestType>
extends Writable
implements ClientWritableStream<RequestType> {
public call?: InterceptingCallInterface;
constructor(readonly serialize: (value: RequestType) => Buffer) {
@ -149,7 +152,8 @@ export class ClientWritableStreamImpl<RequestType> extends Writable
}
}
export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
export class ClientDuplexStreamImpl<RequestType, ResponseType>
extends Duplex
implements ClientDuplexStream<RequestType, ResponseType> {
public call?: InterceptingCallInterface;
constructor(

View File

@ -37,6 +37,7 @@ export interface ChannelOptions {
'grpc.http_connect_target'?: string;
'grpc.http_connect_creds'?: string;
'grpc-node.max_session_memory'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
}

View File

@ -33,9 +33,14 @@ import { FilterStackFactory } from './filter-stack';
import { CallCredentialsFilterFactory } from './call-credentials-filter';
import { DeadlineFilterFactory } from './deadline-filter';
import { CompressionFilterFactory } from './compression-filter';
import { CallConfig, ConfigSelector, getDefaultAuthority, mapUriDefaultScheme } from './resolver';
import {
CallConfig,
ConfigSelector,
getDefaultAuthority,
mapUriDefaultScheme,
} from './resolver';
import { trace, log } from './logging';
import { SubchannelAddress } from './subchannel';
import { SubchannelAddress } from './subchannel-address';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
import { mapProxyName } from './http_proxy';
import { GrpcUri, parseUri, uriToString } from './uri-parser';
@ -43,13 +48,7 @@ import { ServerSurfaceCall } from './server-call';
import { SurfaceCall } from './call';
import { Filter } from './filter';
export enum ConnectivityState {
IDLE,
CONNECTING,
READY,
TRANSIENT_FAILURE,
SHUTDOWN,
}
import { ConnectivityState } from './connectivity-state';
/**
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
@ -262,8 +261,8 @@ export class ChannelImplementation implements Channel {
process.nextTick(() => {
const localQueue = this.configSelectionQueue;
this.configSelectionQueue = [];
this.callRefTimerUnref()
for (const {callStream, callMetadata} of localQueue) {
this.callRefTimerUnref();
for (const { callStream, callMetadata } of localQueue) {
this.tryGetConfig(callStream, callMetadata);
}
this.configSelectionQueue = [];
@ -271,15 +270,21 @@ export class ChannelImplementation implements Channel {
},
(status) => {
if (this.configSelectionQueue.length > 0) {
trace(LogVerbosity.DEBUG, 'channel', 'Name resolution failed for target ' + uriToString(this.target) + ' with calls queued for config selection');
trace(
LogVerbosity.DEBUG,
'channel',
'Name resolution failed for target ' +
uriToString(this.target) +
' with calls queued for config selection'
);
}
const localQueue = this.configSelectionQueue;
this.configSelectionQueue = [];
this.callRefTimerUnref();
for (const {callStream, callMetadata} of localQueue) {
for (const { callStream, callMetadata } of localQueue) {
if (callMetadata.getOptions().waitForReady) {
this.callRefTimerRef();
this.configSelectionQueue.push({callStream, callMetadata});
this.configSelectionQueue.push({ callStream, callMetadata });
} else {
callStream.cancelWithStatus(status.code, status.details);
}
@ -297,20 +302,39 @@ export class ChannelImplementation implements Channel {
private callRefTimerRef() {
// If the hasRef function does not exist, always run the code
if (!this.callRefTimer.hasRef?.()) {
trace(LogVerbosity.DEBUG, 'channel', 'callRefTimer.ref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length);
trace(
LogVerbosity.DEBUG,
'channel',
'callRefTimer.ref | configSelectionQueue.length=' +
this.configSelectionQueue.length +
' pickQueue.length=' +
this.pickQueue.length
);
this.callRefTimer.ref?.();
}
}
private callRefTimerUnref() {
// If the hasRef function does not exist, always run the code
if ((!this.callRefTimer.hasRef) || (this.callRefTimer.hasRef())) {
trace(LogVerbosity.DEBUG, 'channel', 'callRefTimer.unref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length);
if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
trace(
LogVerbosity.DEBUG,
'channel',
'callRefTimer.unref | configSelectionQueue.length=' +
this.configSelectionQueue.length +
' pickQueue.length=' +
this.pickQueue.length
);
this.callRefTimer.unref?.();
}
}
private pushPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig, dynamicFilters: Filter[]) {
private pushPick(
callStream: Http2CallStream,
callMetadata: Metadata,
callConfig: CallConfig,
dynamicFilters: Filter[]
) {
this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters });
this.callRefTimerRef();
}
@ -322,11 +346,16 @@ export class ChannelImplementation implements Channel {
* @param callStream
* @param callMetadata
*/
private tryPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig, dynamicFilters: Filter[]) {
if (callStream.getStatus() !== null) {
return;
}
const pickResult = this.currentPicker.pick({ metadata: callMetadata, extraPickInfo: callConfig.pickInformation });
private tryPick(
callStream: Http2CallStream,
callMetadata: Metadata,
callConfig: CallConfig,
dynamicFilters: Filter[]
) {
const pickResult = this.currentPicker.pick({
metadata: callMetadata,
extraPickInfo: callConfig.pickInformation,
});
trace(
LogVerbosity.DEBUG,
'channel',
@ -425,7 +454,9 @@ export class ChannelImplementation implements Channel {
);
callStream.cancelWithStatus(
Status.INTERNAL,
`Failed to start HTTP/2 stream with error: ${(error as Error).message}`
`Failed to start HTTP/2 stream with error: ${
(error as Error).message
}`
);
}
}
@ -447,7 +478,7 @@ export class ChannelImplementation implements Channel {
(error: Error & { code: number }) => {
// We assume the error code isn't 0 (Status.OK)
callStream.cancelWithStatus(
(typeof error.code === 'number') ? error.code : Status.UNKNOWN,
typeof error.code === 'number' ? error.code : Status.UNKNOWN,
`Getting metadata from plugin failed with error: ${error.message}`
);
}
@ -505,7 +536,7 @@ export class ChannelImplementation implements Channel {
const watchersCopy = this.connectivityStateWatchers.slice();
for (const watcherObject of watchersCopy) {
if (newState !== watcherObject.currentState) {
if(watcherObject.timer) {
if (watcherObject.timer) {
clearTimeout(watcherObject.timer);
}
this.removeConnectivityStateWatcher(watcherObject);
@ -526,9 +557,9 @@ export class ChannelImplementation implements Channel {
* ResolvingLoadBalancer may be idle and if so it needs to be kicked
* because it now has a pending request. */
this.resolvingLoadBalancer.exitIdle();
this.configSelectionQueue.push({
this.configSelectionQueue.push({
callStream: stream,
callMetadata: metadata
callMetadata: metadata,
});
this.callRefTimerRef();
} else {
@ -536,8 +567,13 @@ export class ChannelImplementation implements Channel {
if (callConfig.status === Status.OK) {
if (callConfig.methodConfig.timeout) {
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + callConfig.methodConfig.timeout.seconds);
deadline.setMilliseconds(deadline.getMilliseconds() + callConfig.methodConfig.timeout.nanos / 1_000_000);
deadline.setSeconds(
deadline.getSeconds() + callConfig.methodConfig.timeout.seconds
);
deadline.setMilliseconds(
deadline.getMilliseconds() +
callConfig.methodConfig.timeout.nanos / 1_000_000
);
stream.setConfigDeadline(deadline);
// Refreshing the filters makes the deadline filter pick up the new deadline
stream.filterStack.refresh();
@ -564,7 +600,10 @@ export class ChannelImplementation implements Channel {
this.tryPick(stream, metadata, callConfig, []);
}
} else {
stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod());
stream.cancelWithStatus(
callConfig.status,
'Failed to route call to method ' + stream.getMethod()
);
}
}
}
@ -602,7 +641,7 @@ export class ChannelImplementation implements Channel {
throw new Error('Channel has been shut down');
}
let timer = null;
if(deadline !== Infinity) {
if (deadline !== Infinity) {
const deadlineDate: Date =
deadline instanceof Date ? deadline : new Date(deadline);
const now = new Date();
@ -618,12 +657,12 @@ export class ChannelImplementation implements Channel {
callback(
new Error('Deadline passed without connectivity state change')
);
}, deadlineDate.getTime() - now.getTime())
}, deadlineDate.getTime() - now.getTime());
}
const watcherObject = {
currentState,
callback,
timer
timer,
};
this.connectivityStateWatchers.push(watcherObject);
}

View File

@ -348,7 +348,10 @@ class BaseInterceptingCall implements InterceptingCallInterface {
try {
serialized = this.methodDefinition.requestSerialize(message);
} catch (e) {
this.call.cancelWithStatus(Status.INTERNAL, `Request message serialization failure: ${e.message}`);
this.call.cancelWithStatus(
Status.INTERNAL,
`Request message serialization failure: ${e.message}`
);
return;
}
this.call.sendMessageWithContext(context, serialized);
@ -403,7 +406,8 @@ class BaseInterceptingCall implements InterceptingCallInterface {
* BaseInterceptingCall with special-cased behavior for methods with unary
* responses.
*/
class BaseUnaryInterceptingCall extends BaseInterceptingCall
class BaseUnaryInterceptingCall
extends BaseInterceptingCall
implements InterceptingCallInterface {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
constructor(call: Call, methodDefinition: ClientMethodDefinition<any, any>) {
@ -435,7 +439,8 @@ class BaseUnaryInterceptingCall extends BaseInterceptingCall
* BaseInterceptingCall with special-cased behavior for methods with streaming
* responses.
*/
class BaseStreamingInterceptingCall extends BaseInterceptingCall
class BaseStreamingInterceptingCall
extends BaseInterceptingCall
implements InterceptingCallInterface {}
function getBottomInterceptingCall(

View File

@ -30,7 +30,8 @@ import {
} from './call';
import { CallCredentials } from './call-credentials';
import { Deadline, StatusObject } from './call-stream';
import { Channel, ConnectivityState, ChannelImplementation } from './channel';
import { Channel, ChannelImplementation } from './channel';
import { ConnectivityState } from './connectivity-state';
import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options';
import { Status } from './constants';
@ -55,6 +56,12 @@ const INTERCEPTOR_SYMBOL = Symbol();
const INTERCEPTOR_PROVIDER_SYMBOL = Symbol();
const CALL_INVOCATION_TRANSFORMER_SYMBOL = Symbol();
function isFunction<ResponseType>(
arg: Metadata | CallOptions | UnaryCallback<ResponseType> | undefined
): arg is UnaryCallback<ResponseType> {
return typeof arg === 'function';
}
export interface UnaryCallback<ResponseType> {
(err: ServiceError | null, value?: ResponseType): void;
}
@ -198,9 +205,9 @@ export class Client {
options: CallOptions;
callback: UnaryCallback<ResponseType>;
} {
if (arg1 instanceof Function) {
if (isFunction(arg1)) {
return { metadata: new Metadata(), options: {}, callback: arg1 };
} else if (arg2 instanceof Function) {
} else if (isFunction(arg2)) {
if (arg1 instanceof Metadata) {
return { metadata: arg1, options: {}, callback: arg2 };
} else {
@ -211,7 +218,7 @@ export class Client {
!(
arg1 instanceof Metadata &&
arg2 instanceof Object &&
arg3 instanceof Function
isFunction(arg3)
)
) {
throw new Error('Incorrect arguments passed');
@ -261,9 +268,11 @@ export class Client {
options?: CallOptions | UnaryCallback<ResponseType>,
callback?: UnaryCallback<ResponseType>
): ClientUnaryCall {
const checkedArguments = this.checkOptionalUnaryResponseArguments<
ResponseType
>(metadata, options, callback);
const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
metadata,
options,
callback
);
const methodDefinition: ClientMethodDefinition<
RequestType,
ResponseType
@ -377,9 +386,11 @@ export class Client {
options?: CallOptions | UnaryCallback<ResponseType>,
callback?: UnaryCallback<ResponseType>
): ClientWritableStream<RequestType> {
const checkedArguments = this.checkOptionalUnaryResponseArguments<
ResponseType
>(metadata, options, callback);
const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
metadata,
options,
callback
);
const methodDefinition: ClientMethodDefinition<
RequestType,
ResponseType
@ -403,9 +414,7 @@ export class Client {
callProperties
) as CallProperties<RequestType, ResponseType>;
}
const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<
RequestType
>;
const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<RequestType>;
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
@ -527,9 +536,7 @@ export class Client {
callProperties
) as CallProperties<RequestType, ResponseType>;
}
const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<
ResponseType
>;
const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<ResponseType>;
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
@ -654,7 +661,7 @@ export class Client {
stream.emit('metadata', metadata);
},
onReceiveMessage(message: Buffer) {
stream.push(message)
stream.push(message);
},
onReceiveStatus(status: StatusObject) {
if (receivedStatus) {

View File

@ -0,0 +1,24 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
export enum ConnectivityState {
IDLE,
CONNECTING,
READY,
TRANSIENT_FAILURE,
SHUTDOWN,
}

View File

@ -39,6 +39,7 @@ export enum LogVerbosity {
DEBUG = 0,
INFO,
ERROR,
NONE,
}
/**
@ -51,7 +52,11 @@ export enum Propagate {
CENSUS_TRACING_CONTEXT = 4,
CANCELLATION = 8,
// https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/propagation_bits.h#L43
DEFAULTS = 0xffff | Propagate.DEADLINE | Propagate.CENSUS_STATS_CONTEXT | Propagate.CENSUS_TRACING_CONTEXT | Propagate.CANCELLATION,
DEFAULTS = 0xffff |
Propagate.DEADLINE |
Propagate.CENSUS_STATS_CONTEXT |
Propagate.CENSUS_TRACING_CONTEXT |
Propagate.CANCELLATION,
}
// -1 means unlimited

View File

@ -42,7 +42,7 @@ function getDeadline(deadline: number) {
export class DeadlineFilter extends BaseFilter implements Filter {
private timer: NodeJS.Timer | null = null;
private deadline: number = Infinity;
private deadline = Infinity;
constructor(
private readonly channel: Channel,
private readonly callStream: Call
@ -66,7 +66,7 @@ export class DeadlineFilter extends BaseFilter implements Filter {
clearTimeout(this.timer);
}
const now: number = new Date().getTime();
let timeout = this.deadline - now;
const timeout = this.deadline - now;
if (timeout <= 0) {
process.nextTick(() => {
this.callStream.cancelWithStatus(

View File

@ -1,12 +1,34 @@
export { trace } from './logging';
export { Resolver, ResolverListener, registerResolver, ConfigSelector } from './resolver';
export { GrpcUri, uriToString } from './uri-parser';
export { ServiceConfig, Duration } from './service-config';
export { BackoffTimeout } from './backoff-timeout';
export { LoadBalancer, LoadBalancingConfig, ChannelControlHelper, registerLoadBalancerType, getFirstUsableConfig, validateLoadBalancingConfig } from './load-balancer';
export { SubchannelAddress, subchannelAddressToString } from './subchannel';
export { ChildLoadBalancerHandler } from './load-balancer-child-handler';
export { Picker, UnavailablePicker, QueuePicker, PickResult, PickArgs, PickResultType } from './picker';
export { Call as CallStream } from './call-stream';
export { Filter, BaseFilter, FilterFactory } from './filter';
export { FilterStackFactory } from './filter-stack';
export { trace } from './logging';
export {
Resolver,
ResolverListener,
registerResolver,
ConfigSelector,
} from './resolver';
export { GrpcUri, uriToString } from './uri-parser';
export { ServiceConfig, Duration } from './service-config';
export { BackoffTimeout } from './backoff-timeout';
export {
LoadBalancer,
LoadBalancingConfig,
ChannelControlHelper,
registerLoadBalancerType,
getFirstUsableConfig,
validateLoadBalancingConfig,
} from './load-balancer';
export {
SubchannelAddress,
subchannelAddressToString,
} from './subchannel-address';
export { ChildLoadBalancerHandler } from './load-balancer-child-handler';
export {
Picker,
UnavailablePicker,
QueuePicker,
PickResult,
PickArgs,
PickResultType,
} from './picker';
export { Call as CallStream } from './call-stream';
export { Filter, BaseFilter, FilterFactory } from './filter';
export { FilterStackFactory } from './filter-stack';

View File

@ -57,8 +57,7 @@ export abstract class BaseFilter implements Filter {
return status;
}
refresh(): void {
}
refresh(): void {}
}
export interface FilterFactory<T extends Filter> {

View File

@ -26,7 +26,7 @@ import {
SubchannelAddress,
isTcpSubchannelAddress,
subchannelAddressToString,
} from './subchannel';
} from './subchannel-address';
import { ChannelOptions } from './channel-options';
import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser';
import { URL } from 'url';
@ -93,7 +93,7 @@ function getProxyInfo(): ProxyInfo {
port = '80';
}
const result: ProxyInfo = {
address: `${hostname}:${port}`
address: `${hostname}:${port}`,
};
if (userCred) {
result.creds = userCred;
@ -147,7 +147,9 @@ export function mapProxyName(
const serverHost = hostPort.host;
for (const host of getNoProxyHostList()) {
if (host === serverHost) {
trace('Not using proxy for target in no_proxy list: ' + uriToString(target));
trace(
'Not using proxy for target in no_proxy list: ' + uriToString(target)
);
return noProxyResult;
}
}
@ -226,7 +228,7 @@ export function getProxiedConnection(
const targetPath = getDefaultAuthority(parsedTarget);
const hostPort = splitHostPort(targetPath);
const remoteHost = hostPort?.host ?? targetPath;
const cts = tls.connect(
{
host: remoteHost,
@ -244,7 +246,13 @@ export function getProxiedConnection(
resolve({ socket: cts, realTarget: parsedTarget });
}
);
cts.on('error', () => {
cts.on('error', (error: Error) => {
trace('Failed to establish a TLS connection to ' +
options.path +
' through proxy ' +
proxyAddressString +
' with error ' +
error.message);
reject();
});
} else {

View File

@ -24,7 +24,8 @@ import {
} from './call';
import { CallCredentials, OAuth2Client } from './call-credentials';
import { Deadline, StatusObject } from './call-stream';
import { Channel, ConnectivityState, ChannelImplementation } from './channel';
import { Channel, ChannelImplementation } from './channel';
import { ConnectivityState } from './connectivity-state';
import { ChannelCredentials } from './channel-credentials';
import {
CallOptions,
@ -182,7 +183,12 @@ export {
/**** Server ****/
export { handleBidiStreamingCall, handleServerStreamingCall, handleUnaryCall, handleClientStreamingCall };
export {
handleBidiStreamingCall,
handleServerStreamingCall,
handleUnaryCall,
handleClientStreamingCall,
};
/* eslint-disable @typescript-eslint/no-explicit-any */
export type Call =
@ -246,10 +252,16 @@ export { ChannelOptions } from './channel-options';
import * as experimental from './experimental';
export { experimental };
import * as resolver from './resolver';
import * as load_balancer from './load-balancer';
import * as resolver_dns from './resolver-dns';
import * as resolver_uds from './resolver-uds';
import * as resolver_ip from './resolver-ip';
import * as load_balancer_pick_first from './load-balancer-pick-first';
import * as load_balancer_round_robin from './load-balancer-round-robin';
(() => {
resolver.registerAll();
load_balancer.registerAll();
resolver_dns.setup();
resolver_uds.setup();
resolver_ip.setup();
load_balancer_pick_first.setup();
load_balancer_round_robin.setup();
})();

View File

@ -18,12 +18,13 @@
import {
LoadBalancer,
ChannelControlHelper,
LoadBalancingConfig,
createLoadBalancer,
LoadBalancingConfig
} from './load-balancer';
import { SubchannelAddress, Subchannel } from './subchannel';
import { Subchannel } from './subchannel';
import { SubchannelAddress } from './subchannel-address';
import { ChannelOptions } from './channel-options';
import { ConnectivityState } from './channel';
import { ConnectivityState } from './connectivity-state';
import { Picker } from './picker';
const TYPE_NAME = 'child_load_balancer_helper';

View File

@ -18,10 +18,11 @@
import {
LoadBalancer,
ChannelControlHelper,
LoadBalancingConfig,
registerDefaultLoadBalancerType,
registerLoadBalancerType,
LoadBalancingConfig
} from './load-balancer';
import { ConnectivityState } from './channel';
import { ConnectivityState } from './connectivity-state';
import {
QueuePicker,
Picker,
@ -30,12 +31,11 @@ import {
PickResultType,
UnavailablePicker,
} from './picker';
import { Subchannel, ConnectivityStateListener } from './subchannel';
import {
Subchannel,
ConnectivityStateListener,
SubchannelAddress,
subchannelAddressToString,
} from './subchannel';
} from './subchannel-address';
import * as logging from './logging';
import { LogVerbosity } from './constants';
@ -62,10 +62,11 @@ export class PickFirstLoadBalancingConfig implements LoadBalancingConfig {
toJsonObject(): object {
return {
[TYPE_NAME]: {}
[TYPE_NAME]: {},
};
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
static createFromJson(obj: any) {
return new PickFirstLoadBalancingConfig();
}
@ -457,5 +458,10 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
export function setup(): void {
registerLoadBalancerType(TYPE_NAME, PickFirstLoadBalancer, PickFirstLoadBalancingConfig);
registerLoadBalancerType(
TYPE_NAME,
PickFirstLoadBalancer,
PickFirstLoadBalancingConfig
);
registerDefaultLoadBalancerType(TYPE_NAME);
}

View File

@ -18,10 +18,10 @@
import {
LoadBalancer,
ChannelControlHelper,
LoadBalancingConfig,
registerLoadBalancerType,
LoadBalancingConfig
} from './load-balancer';
import { ConnectivityState } from './channel';
import { ConnectivityState } from './connectivity-state';
import {
QueuePicker,
Picker,
@ -30,12 +30,11 @@ import {
PickResultType,
UnavailablePicker,
} from './picker';
import { Subchannel, ConnectivityStateListener } from './subchannel';
import {
Subchannel,
ConnectivityStateListener,
SubchannelAddress,
subchannelAddressToString,
} from './subchannel';
} from './subchannel-address';
import * as logging from './logging';
import { LogVerbosity } from './constants';
@ -56,10 +55,11 @@ class RoundRobinLoadBalancingConfig implements LoadBalancingConfig {
toJsonObject(): object {
return {
[TYPE_NAME]: {}
[TYPE_NAME]: {},
};
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
static createFromJson(obj: any) {
return new RoundRobinLoadBalancingConfig();
}
@ -128,7 +128,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
this.subchannelStateCounts[previousState] -= 1;
this.subchannelStateCounts[newState] += 1;
this.calculateAndUpdateState();
if (
newState === ConnectivityState.TRANSIENT_FAILURE ||
newState === ConnectivityState.IDLE
@ -247,5 +247,9 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
}
export function setup() {
registerLoadBalancerType(TYPE_NAME, RoundRobinLoadBalancer, RoundRobinLoadBalancingConfig);
registerLoadBalancerType(
TYPE_NAME,
RoundRobinLoadBalancer,
RoundRobinLoadBalancingConfig
);
}

View File

@ -16,11 +16,10 @@
*/
import { ChannelOptions } from './channel-options';
import { Subchannel, SubchannelAddress } from './subchannel';
import { ConnectivityState } from './channel';
import { Subchannel } from './subchannel';
import { SubchannelAddress } from './subchannel-address';
import { ConnectivityState } from './connectivity-state';
import { Picker } from './picker';
import * as load_balancer_pick_first from './load-balancer-pick-first';
import * as load_balancer_round_robin from './load-balancer-round-robin';
/**
* A collection of functions associated with a channel that a load balancer
@ -102,17 +101,21 @@ export interface LoadBalancingConfig {
}
export interface LoadBalancingConfigConstructor {
new(...args: any): LoadBalancingConfig;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (...args: any): LoadBalancingConfig;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
createFromJson(obj: any): LoadBalancingConfig;
}
const registeredLoadBalancerTypes: {
[name: string]: {
LoadBalancer: LoadBalancerConstructor,
LoadBalancingConfig: LoadBalancingConfigConstructor
LoadBalancer: LoadBalancerConstructor;
LoadBalancingConfig: LoadBalancingConfigConstructor;
};
} = {};
let defaultLoadBalancerType: string | null = null;
export function registerLoadBalancerType(
typeName: string,
loadBalancerType: LoadBalancerConstructor,
@ -120,17 +123,23 @@ export function registerLoadBalancerType(
) {
registeredLoadBalancerTypes[typeName] = {
LoadBalancer: loadBalancerType,
LoadBalancingConfig: loadBalancingConfigType
LoadBalancingConfig: loadBalancingConfigType,
};
}
export function registerDefaultLoadBalancerType(typeName: string) {
defaultLoadBalancerType = typeName;
}
export function createLoadBalancer(
config: LoadBalancingConfig,
channelControlHelper: ChannelControlHelper
): LoadBalancer | null {
const typeName = config.getLoadBalancerName();
if (typeName in registeredLoadBalancerTypes) {
return new registeredLoadBalancerTypes[typeName].LoadBalancer(channelControlHelper);
return new registeredLoadBalancerTypes[typeName].LoadBalancer(
channelControlHelper
);
} else {
return null;
}
@ -140,40 +149,49 @@ export function isLoadBalancerNameRegistered(typeName: string): boolean {
return typeName in registeredLoadBalancerTypes;
}
export function getFirstUsableConfig(configs: LoadBalancingConfig[], defaultPickFirst?: true): LoadBalancingConfig;
export function getFirstUsableConfig(
configs: LoadBalancingConfig[],
defaultPickFirst: boolean = false
fallbackTodefault?: true
): LoadBalancingConfig;
export function getFirstUsableConfig(
configs: LoadBalancingConfig[],
fallbackTodefault = false
): LoadBalancingConfig | null {
for (const config of configs) {
if (config.getLoadBalancerName() in registeredLoadBalancerTypes) {
return config;
}
}
if (defaultPickFirst) {
return new load_balancer_pick_first.PickFirstLoadBalancingConfig()
if (fallbackTodefault) {
if (defaultLoadBalancerType) {
return new registeredLoadBalancerTypes[
defaultLoadBalancerType
]!.LoadBalancingConfig();
} else {
return null;
}
} else {
return null;
}
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function validateLoadBalancingConfig(obj: any): LoadBalancingConfig {
if (!(obj !== null && (typeof obj === 'object'))) {
if (!(obj !== null && typeof obj === 'object')) {
throw new Error('Load balancing config must be an object');
}
const keys = Object.keys(obj);
if (keys.length !== 1) {
throw new Error('Provided load balancing config has multiple conflicting entries');
throw new Error(
'Provided load balancing config has multiple conflicting entries'
);
}
const typeName = keys[0];
if (typeName in registeredLoadBalancerTypes) {
return registeredLoadBalancerTypes[typeName].LoadBalancingConfig.createFromJson(obj[typeName]);
return registeredLoadBalancerTypes[
typeName
].LoadBalancingConfig.createFromJson(obj[typeName]);
} else {
throw new Error(`Unrecognized load balancing config name ${typeName}`);
}
}
export function registerAll() {
load_balancer_pick_first.setup();
load_balancer_round_robin.setup();
}

View File

@ -20,9 +20,10 @@ import { LogVerbosity } from './constants';
let _logger: Partial<Console> = console;
let _logVerbosity: LogVerbosity = LogVerbosity.ERROR;
const verbosityString = process.env.GRPC_NODE_VERBOSITY ?? process.env.GRPC_VERBOSITY ?? '';
const verbosityString =
process.env.GRPC_NODE_VERBOSITY ?? process.env.GRPC_VERBOSITY ?? '';
switch (verbosityString) {
switch (verbosityString.toUpperCase()) {
case 'DEBUG':
_logVerbosity = LogVerbosity.DEBUG;
break;
@ -32,6 +33,9 @@ switch (verbosityString) {
case 'ERROR':
_logVerbosity = LogVerbosity.ERROR;
break;
case 'NONE':
_logVerbosity = LogVerbosity.NONE;
break;
default:
// Ignore any other values
}
@ -55,16 +59,28 @@ export const log = (severity: LogVerbosity, ...args: any[]): void => {
}
};
const tracersString = process.env.GRPC_NODE_TRACE ?? process.env.GRPC_TRACE ?? '';
const enabledTracers = tracersString.split(',');
const allEnabled = enabledTracers.includes('all');
const tracersString =
process.env.GRPC_NODE_TRACE ?? process.env.GRPC_TRACE ?? '';
const enabledTracers = new Set<string>();
const disabledTracers = new Set<string>();
for (const tracerName of tracersString.split(',')) {
if (tracerName.startsWith('-')) {
disabledTracers.add(tracerName.substring(1));
} else {
enabledTracers.add(tracerName);
}
}
const allEnabled = enabledTracers.has('all');
export function trace(
severity: LogVerbosity,
tracer: string,
text: string
): void {
if (allEnabled || enabledTracers.includes(tracer)) {
if (
!disabledTracers.has(tracer) &&
(allEnabled || enabledTracers.has(tracer))
) {
log(severity, new Date().toISOString() + ' | ' + tracer + ' | ' + text);
}
}

View File

@ -98,7 +98,7 @@ export interface ServiceClientConstructor {
* keys.
* @param key key for check, string.
*/
function isPrototypePolluted(key: string): Boolean {
function isPrototypePolluted(key: string): boolean {
return ['__proto__', 'prototype', 'constructor'].includes(key);
}

View File

@ -15,10 +15,14 @@
*
*/
import { BaseFilter, Filter, FilterFactory } from "./filter";
import { Call, WriteObject } from "./call-stream";
import { Status, DEFAULT_MAX_SEND_MESSAGE_LENGTH, DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH } from "./constants";
import { ChannelOptions } from "./channel-options";
import { BaseFilter, Filter, FilterFactory } from './filter';
import { Call, WriteObject } from './call-stream';
import {
Status,
DEFAULT_MAX_SEND_MESSAGE_LENGTH,
DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
} from './constants';
import { ChannelOptions } from './channel-options';
export class MaxMessageSizeFilter extends BaseFilter implements Filter {
private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
@ -44,7 +48,10 @@ export class MaxMessageSizeFilter extends BaseFilter implements Filter {
} else {
const concreteMessage = await message;
if (concreteMessage.message.length > this.maxSendMessageSize) {
this.callStream.cancelWithStatus(Status.RESOURCE_EXHAUSTED, `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`);
this.callStream.cancelWithStatus(
Status.RESOURCE_EXHAUSTED,
`Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`
);
return Promise.reject<WriteObject>('Message too large');
} else {
return concreteMessage;
@ -60,7 +67,10 @@ export class MaxMessageSizeFilter extends BaseFilter implements Filter {
} else {
const concreteMessage = await message;
if (concreteMessage.length > this.maxReceiveMessageSize) {
this.callStream.cancelWithStatus(Status.RESOURCE_EXHAUSTED, `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`);
this.callStream.cancelWithStatus(
Status.RESOURCE_EXHAUSTED,
`Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`
);
return Promise.reject<Buffer>('Message too large');
} else {
return concreteMessage;
@ -69,7 +79,8 @@ export class MaxMessageSizeFilter extends BaseFilter implements Filter {
}
}
export class MaxMessageSizeFilterFactory implements FilterFactory<MaxMessageSizeFilter> {
export class MaxMessageSizeFilterFactory
implements FilterFactory<MaxMessageSizeFilter> {
constructor(private readonly options: ChannelOptions) {}
createFilter(callStream: Call): MaxMessageSizeFilter {

View File

@ -247,7 +247,7 @@ export class Metadata {
* representation of the metadata map.
*/
toJSON() {
const result: {[key: string]: MetadataValue[]} = {};
const result: { [key: string]: MetadataValue[] } = {};
for (const [key, values] of this.internalRepr.entries()) {
result[key] = values;
}

View File

@ -85,7 +85,7 @@ export interface DropCallPickResult extends PickResult {
export interface PickArgs {
metadata: Metadata;
extraPickInfo: {[key: string]: string};
extraPickInfo: { [key: string]: string };
}
/**

View File

@ -28,7 +28,7 @@ import { StatusObject } from './call-stream';
import { Metadata } from './metadata';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import { SubchannelAddress, TcpSubchannelAddress } from './subchannel';
import { SubchannelAddress, TcpSubchannelAddress } from './subchannel-address';
import { GrpcUri, uriToString, splitHostPort } from './uri-parser';
import { isIPv6, isIPv4 } from 'net';
import { ChannelOptions } from './channel-options';
@ -129,7 +129,13 @@ class DnsResolver implements Resolver {
if (this.ipResult !== null) {
trace('Returning IP address for target ' + uriToString(this.target));
setImmediate(() => {
this.listener.onSuccessfulResolution(this.ipResult!, null, null, null, {});
this.listener.onSuccessfulResolution(
this.ipResult!,
null,
null,
null,
{}
);
});
return;
}

View File

@ -0,0 +1,116 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { isIPv4, isIPv6 } from 'net';
import { StatusObject } from './call-stream';
import { ChannelOptions } from './channel-options';
import { LogVerbosity, Status } from './constants';
import { Metadata } from './metadata';
import { registerResolver, Resolver, ResolverListener } from './resolver';
import { SubchannelAddress } from './subchannel-address';
import { GrpcUri, splitHostPort, uriToString } from './uri-parser';
import * as logging from './logging';
const TRACER_NAME = 'ip_resolver';
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
const IPV4_SCHEME = 'ipv4';
const IPV6_SCHEME = 'ipv6';
/**
* The default TCP port to connect to if not explicitly specified in the target.
*/
const DEFAULT_PORT = 443;
class IpResolver implements Resolver {
private addresses: SubchannelAddress[] = [];
private error: StatusObject | null = null;
constructor(
private target: GrpcUri,
private listener: ResolverListener,
channelOptions: ChannelOptions
) {
trace('Resolver constructed for target ' + uriToString(target));
const addresses: SubchannelAddress[] = [];
if (!(target.scheme === IPV4_SCHEME || target.scheme === IPV6_SCHEME)) {
this.error = {
code: Status.UNAVAILABLE,
details: `Unrecognized scheme ${target.scheme} in IP resolver`,
metadata: new Metadata(),
};
return;
}
const pathList = target.path.split(',');
for (const path of pathList) {
const hostPort = splitHostPort(path);
if (hostPort === null) {
this.error = {
code: Status.UNAVAILABLE,
details: `Failed to parse ${target.scheme} address ${path}`,
metadata: new Metadata(),
};
return;
}
if (
(target.scheme === IPV4_SCHEME && !isIPv4(hostPort.host)) ||
(target.scheme === IPV6_SCHEME && !isIPv6(hostPort.host))
) {
this.error = {
code: Status.UNAVAILABLE,
details: `Failed to parse ${target.scheme} address ${path}`,
metadata: new Metadata(),
};
return;
}
addresses.push({
host: hostPort.host,
port: hostPort.port ?? DEFAULT_PORT,
});
}
this.addresses = addresses;
trace('Parsed ' + target.scheme + ' address list ' + this.addresses);
}
updateResolution(): void {
process.nextTick(() => {
if (this.error) {
this.listener.onError(this.error);
} else {
this.listener.onSuccessfulResolution(
this.addresses,
null,
null,
null,
{}
);
}
});
}
destroy(): void {
// This resolver owns no resources, so we do nothing here.
}
static getDefaultAuthority(target: GrpcUri): string {
return target.path.split(',')[0];
}
}
export function setup() {
registerResolver(IPV4_SCHEME, IpResolver);
registerResolver(IPV6_SCHEME, IpResolver);
}

View File

@ -15,7 +15,7 @@
*/
import { Resolver, ResolverListener, registerResolver } from './resolver';
import { SubchannelAddress } from './subchannel';
import { SubchannelAddress } from './subchannel-address';
import { GrpcUri } from './uri-parser';
import { ChannelOptions } from './channel-options';

View File

@ -16,10 +16,8 @@
*/
import { MethodConfig, ServiceConfig } from './service-config';
import * as resolver_dns from './resolver-dns';
import * as resolver_uds from './resolver-uds';
import { StatusObject } from './call-stream';
import { SubchannelAddress } from './subchannel';
import { SubchannelAddress } from './subchannel-address';
import { GrpcUri, uriToString } from './uri-parser';
import { ChannelOptions } from './channel-options';
import { Metadata } from './metadata';
@ -29,7 +27,7 @@ import { Filter, FilterFactory } from './filter';
export interface CallConfig {
methodConfig: MethodConfig;
onCommitted?: () => void;
pickInformation: {[key: string]: string};
pickInformation: { [key: string]: string };
status: Status;
dynamicFilterFactories: FilterFactory<Filter>[];
}
@ -82,7 +80,7 @@ export interface Resolver {
* called synchronously with the constructor or updateResolution.
*/
updateResolution(): void;
/**
* Destroy the resolver. Should be called when the owning channel shuts down.
*/
@ -177,8 +175,3 @@ export function mapUriDefaultScheme(target: GrpcUri): GrpcUri | null {
}
return target;
}
export function registerAll() {
resolver_dns.setup();
resolver_uds.setup();
}

View File

@ -18,11 +18,11 @@
import {
ChannelControlHelper,
LoadBalancer,
LoadBalancingConfig,
getFirstUsableConfig,
LoadBalancingConfig
} from './load-balancer';
import { ServiceConfig, validateServiceConfig } from './service-config';
import { ConnectivityState } from './channel';
import { ConnectivityState } from './connectivity-state';
import { ConfigSelector, createResolver, Resolver } from './resolver';
import { ServiceError } from './call';
import { Picker, UnavailablePicker, QueuePicker } from './picker';
@ -32,7 +32,7 @@ import { StatusObject } from './call-stream';
import { Metadata } from './metadata';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import { SubchannelAddress } from './subchannel';
import { SubchannelAddress } from './subchannel-address';
import { GrpcUri, uriToString } from './uri-parser';
import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
import { ChannelOptions } from './channel-options';
@ -46,15 +46,23 @@ function trace(text: string): void {
const DEFAULT_LOAD_BALANCER_NAME = 'pick_first';
function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSelector {
return function defaultConfigSelector(methodName: string, metadata: Metadata) {
const splitName = methodName.split('/').filter(x => x.length > 0);
function getDefaultConfigSelector(
serviceConfig: ServiceConfig | null
): ConfigSelector {
return function defaultConfigSelector(
methodName: string,
metadata: Metadata
) {
const splitName = methodName.split('/').filter((x) => x.length > 0);
const service = splitName[0] ?? '';
const method = splitName[1] ?? '';
if (serviceConfig && serviceConfig.methodConfig) {
for (const methodConfig of serviceConfig.methodConfig) {
for (const name of methodConfig.name) {
if (name.service === service && (name.method === undefined || name.method === method)) {
if (
name.service === service &&
(name.method === undefined || name.method === method)
) {
return {
methodConfig: methodConfig,
pickInformation: {},
@ -66,12 +74,12 @@ function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSe
}
}
return {
methodConfig: {name: []},
methodConfig: { name: [] },
pickInformation: {},
status: Status.OK,
dynamicFilterFactories: []
};
}
};
}
export interface ResolutionCallback {
@ -203,7 +211,10 @@ export class ResolvingLoadBalancer implements LoadBalancer {
}
const workingConfigList =
workingServiceConfig?.loadBalancingConfig ?? [];
const loadBalancingConfig = getFirstUsableConfig(workingConfigList, true);
const loadBalancingConfig = getFirstUsableConfig(
workingConfigList,
true
);
if (loadBalancingConfig === null) {
// There were load balancing configs but none are supported. This counts as a resolution failure
this.handleResolutionFailure({
@ -219,8 +230,11 @@ export class ResolvingLoadBalancer implements LoadBalancer {
loadBalancingConfig,
attributes
);
const finalServiceConfig = workingServiceConfig ?? this.defaultServiceConfig;
this.onSuccessfulResolution(configSelector ?? getDefaultConfigSelector(finalServiceConfig));
const finalServiceConfig =
workingServiceConfig ?? this.defaultServiceConfig;
this.onSuccessfulResolution(
configSelector ?? getDefaultConfigSelector(finalServiceConfig)
);
},
onError: (error: StatusObject) => {
this.handleResolutionFailure(error);

View File

@ -100,7 +100,8 @@ export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
ObjectReadable<RequestType> &
ObjectWritable<ResponseType> & { end: (metadata?: Metadata) => void };
export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
export class ServerUnaryCallImpl<RequestType, ResponseType>
extends EventEmitter
implements ServerUnaryCall<RequestType, ResponseType> {
cancelled: boolean;
@ -239,7 +240,8 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
}
}
export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
export class ServerDuplexStreamImpl<RequestType, ResponseType>
extends Duplex
implements ServerDuplexStream<RequestType, ResponseType> {
cancelled: boolean;
private trailingMetadata: Metadata;

View File

@ -56,7 +56,7 @@ import {
TcpSubchannelAddress,
isTcpSubchannelAddress,
subchannelAddressToString,
} from './subchannel';
} from './subchannel-address';
import { parseUri } from './uri-parser';
const TRACER_NAME = 'server';
@ -209,10 +209,7 @@ export class Server {
}
removeService(service: ServiceDefinition): void {
if (
service === null ||
typeof service !== 'object'
) {
if (service === null || typeof service !== 'object') {
throw new Error('removeService() requires object as argument');
}
@ -258,10 +255,12 @@ export class Server {
}
const serverOptions: http2.ServerOptions = {
maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER
maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
};
if ('grpc-node.max_session_memory' in this.options) {
serverOptions.maxSessionMemory = this.options['grpc-node.max_session_memory'];
serverOptions.maxSessionMemory = this.options[
'grpc-node.max_session_memory'
];
}
if ('grpc.max_concurrent_streams' in this.options) {
serverOptions.settings = {

View File

@ -27,7 +27,10 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import * as os from 'os';
import { LoadBalancingConfig, validateLoadBalancingConfig } from './load-balancer';
import {
LoadBalancingConfig,
validateLoadBalancingConfig,
} from './load-balancer';
export interface MethodConfigName {
service: string;
@ -107,21 +110,30 @@ function validateMethodConfig(obj: any): MethodConfig {
}
if ('timeout' in obj) {
if (typeof obj.timeout === 'object') {
if (!('seconds' in obj.timeout) || !(typeof obj.timeout.seconds === 'number')) {
if (
!('seconds' in obj.timeout) ||
!(typeof obj.timeout.seconds === 'number')
) {
throw new Error('Invalid method config: invalid timeout.seconds');
}
if (!('nanos' in obj.timeout) || !(typeof obj.timeout.nanos === 'number')) {
if (
!('nanos' in obj.timeout) ||
!(typeof obj.timeout.nanos === 'number')
) {
throw new Error('Invalid method config: invalid timeout.nanos');
}
result.timeout = obj.timeout;
} else if (
(typeof obj.timeout === 'string') && TIMEOUT_REGEX.test(obj.timeout)
typeof obj.timeout === 'string' &&
TIMEOUT_REGEX.test(obj.timeout)
) {
const timeoutParts = obj.timeout.substring(0, obj.timeout.length - 1).split('.');
const timeoutParts = obj.timeout
.substring(0, obj.timeout.length - 1)
.split('.');
result.timeout = {
seconds: timeoutParts[0] | 0,
nanos: (timeoutParts[1] ?? 0) | 0
}
nanos: (timeoutParts[1] ?? 0) | 0,
};
} else {
throw new Error('Invalid method config: invalid timeout');
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
export interface TcpSubchannelAddress {
port: number;
host: string;
}
export interface IpcSubchannelAddress {
path: string;
}
/**
* This represents a single backend address to connect to. This interface is a
* subset of net.SocketConnectOpts, i.e. the options described at
* https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener.
* Those are in turn a subset of the options that can be passed to http2.connect.
*/
export type SubchannelAddress = TcpSubchannelAddress | IpcSubchannelAddress;
export function isTcpSubchannelAddress(
address: SubchannelAddress
): address is TcpSubchannelAddress {
return 'port' in address;
}
export function subchannelAddressEqual(
address1: SubchannelAddress,
address2: SubchannelAddress
): boolean {
if (isTcpSubchannelAddress(address1)) {
return (
isTcpSubchannelAddress(address2) &&
address1.host === address2.host &&
address1.port === address2.port
);
} else {
return !isTcpSubchannelAddress(address2) && address1.path === address2.path;
}
}
export function subchannelAddressToString(address: SubchannelAddress): string {
if (isTcpSubchannelAddress(address)) {
return address.host + ':' + address.port;
} else {
return address.path;
}
}

View File

@ -16,11 +16,11 @@
*/
import { ChannelOptions, channelOptionsEqual } from './channel-options';
import { Subchannel } from './subchannel';
import {
Subchannel,
SubchannelAddress,
subchannelAddressEqual,
} from './subchannel';
} from './subchannel-address';
import { ChannelCredentials } from './channel-credentials';
import { GrpcUri, uriToString } from './uri-parser';

View File

@ -21,7 +21,7 @@ import { Metadata } from './metadata';
import { Http2CallStream } from './call-stream';
import { ChannelOptions } from './channel-options';
import { PeerCertificate, checkServerIdentity } from 'tls';
import { ConnectivityState } from './channel';
import { ConnectivityState } from './connectivity-state';
import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
import { getDefaultAuthority } from './resolver';
import * as logging from './logging';
@ -31,6 +31,10 @@ import * as net from 'net';
import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser';
import { ConnectionOptions } from 'tls';
import { FilterFactory, Filter } from './filter';
import {
SubchannelAddress,
subchannelAddressToString,
} from './subchannel-address';
const clientVersion = require('../../package.json').version;
@ -82,52 +86,6 @@ function uniformRandom(min: number, max: number) {
const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii');
export interface TcpSubchannelAddress {
port: number;
host: string;
}
export interface IpcSubchannelAddress {
path: string;
}
/**
* This represents a single backend address to connect to. This interface is a
* subset of net.SocketConnectOpts, i.e. the options described at
* https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener.
* Those are in turn a subset of the options that can be passed to http2.connect.
*/
export type SubchannelAddress = TcpSubchannelAddress | IpcSubchannelAddress;
export function isTcpSubchannelAddress(
address: SubchannelAddress
): address is TcpSubchannelAddress {
return 'port' in address;
}
export function subchannelAddressEqual(
address1: SubchannelAddress,
address2: SubchannelAddress
): boolean {
if (isTcpSubchannelAddress(address1)) {
return (
isTcpSubchannelAddress(address2) &&
address1.host === address2.host &&
address1.port === address2.port
);
} else {
return !isTcpSubchannelAddress(address2) && address1.path === address2.path;
}
}
export function subchannelAddressToString(address: SubchannelAddress): string {
if (isTcpSubchannelAddress(address)) {
return address.host + ':' + address.port;
} else {
return address.path;
}
}
export class Subchannel {
/**
* The subchannel's current connectivity state. Invariant: `session` === `null`
@ -183,7 +141,7 @@ export class Subchannel {
/**
* Indicates whether keepalive pings should be sent without any active calls
*/
private keepaliveWithoutCalls: boolean = false;
private keepaliveWithoutCalls = false;
/**
* Tracks calls with references to this subchannel
@ -231,7 +189,8 @@ export class Subchannel {
this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!;
}
if ('grpc.keepalive_permit_without_calls' in options) {
this.keepaliveWithoutCalls = options['grpc.keepalive_permit_without_calls'] === 1;
this.keepaliveWithoutCalls =
options['grpc.keepalive_permit_without_calls'] === 1;
} else {
this.keepaliveWithoutCalls = false;
}
@ -276,10 +235,15 @@ export class Subchannel {
}
private sendPing() {
logging.trace(LogVerbosity.DEBUG, 'keepalive', 'Sending ping to ' + this.subchannelAddressString);
logging.trace(
LogVerbosity.DEBUG,
'keepalive',
'Sending ping to ' + this.subchannelAddressString
);
this.keepaliveTimeoutId = setTimeout(() => {
this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE);
}, this.keepaliveTimeoutMs);
this.keepaliveTimeoutId.unref?.();
this.session!.ping(
(err: Error | null, duration: number, payload: Buffer) => {
clearTimeout(this.keepaliveTimeoutId);
@ -291,6 +255,7 @@ export class Subchannel {
this.keepaliveIntervalId = setInterval(() => {
this.sendPing();
}, this.keepaliveTimeMs);
this.keepaliveIntervalId.unref?.();
/* Don't send a ping immediately because whatever caused us to start
* sending pings should also involve some network activity. */
}
@ -308,7 +273,9 @@ export class Subchannel {
this.credentials._getConnectionOptions() || {};
connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER;
if ('grpc-node.max_session_memory' in this.options) {
connectionOptions.maxSessionMemory = this.options['grpc-node.max_session_memory'];
connectionOptions.maxSessionMemory = this.options[
'grpc-node.max_session_memory'
];
}
let addressScheme = 'http://';
if ('secureContext' in connectionOptions) {
@ -430,7 +397,11 @@ export class Subchannel {
);
logging.log(
LogVerbosity.ERROR,
`Connection to ${uriToString(this.channelTarget)} at ${this.subchannelAddressString} rejected by server because of excess pings. Increasing ping interval to ${this.keepaliveTimeMs} ms`
`Connection to ${uriToString(this.channelTarget)} at ${
this.subchannelAddressString
} rejected by server because of excess pings. Increasing ping interval to ${
this.keepaliveTimeMs
} ms`
);
}
trace(
@ -596,11 +567,7 @@ export class Subchannel {
* this subchannel, we can be sure it will never be used again. */
if (this.callRefcount === 0 && this.refcount === 0) {
this.transitionToState(
[
ConnectivityState.CONNECTING,
ConnectivityState.IDLE,
ConnectivityState.READY,
],
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE
);
}
@ -719,7 +686,14 @@ export class Subchannel {
for (const header of Object.keys(headers)) {
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
}
logging.trace(LogVerbosity.DEBUG, 'call_stream', 'Starting stream on subchannel ' + this.subchannelAddressString + ' with headers\n' + headersString);
logging.trace(
LogVerbosity.DEBUG,
'call_stream',
'Starting stream on subchannel ' +
this.subchannelAddressString +
' with headers\n' +
headersString
);
callStream.attachHttp2Stream(http2Stream, this, extraFilters);
}

View File

@ -20,7 +20,7 @@ import * as assert from 'assert';
import * as grpc from '../src';
import { Server, ServerCredentials } from '../src';
import { Client } from '../src';
import { ConnectivityState } from '../src/channel';
import { ConnectivityState } from "../src/connectivity-state";
const clientInsecureCreds = grpc.credentials.createInsecure();
const serverInsecureCreds = ServerCredentials.createInsecure();

View File

@ -19,18 +19,23 @@
// tslint:disable no-any
import * as assert from 'assert';
import * as resolverManager from '../src/resolver';
import * as resolver_dns from '../src/resolver-dns';
import * as resolver_uds from '../src/resolver-uds';
import * as resolver_ip from '../src/resolver-ip';
import { ServiceConfig } from '../src/service-config';
import { StatusObject } from '../src/call-stream';
import { SubchannelAddress, isTcpSubchannelAddress } from '../src/subchannel';
import { SubchannelAddress, isTcpSubchannelAddress } from "../src/subchannel-address";
import { parseUri, GrpcUri } from '../src/uri-parser';
describe('Name Resolver', () => {
before(() => {
resolver_dns.setup();
resolver_uds.setup();
resolver_ip.setup();
});
describe('DNS Names', function() {
// For some reason DNS queries sometimes take a long time on Windows
this.timeout(4000);
before(() => {
resolverManager.registerAll();
});
it('Should resolve localhost properly', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('localhost:50051')!)!;
const listener: resolverManager.ResolverListener = {
@ -388,6 +393,186 @@ describe('Name Resolver', () => {
resolver.updateResolution();
});
});
describe('IP Addresses', () => {
it('should handle one IPv4 address with no port', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('ipv4:127.0.0.1')!)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '127.0.0.1' &&
addr.port === 443
)
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
});
it('should handle one IPv4 address with a port', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('ipv4:127.0.0.1:50051')!)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '127.0.0.1' &&
addr.port === 50051
)
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
});
it('should handle multiple IPv4 addresses with different ports', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('ipv4:127.0.0.1:50051,127.0.0.1:50052')!)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '127.0.0.1' &&
addr.port === 50051
)
);
assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '127.0.0.1' &&
addr.port === 50052
)
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
});
it('should handle one IPv6 address with no port', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('ipv6:::1')!)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '::1' &&
addr.port === 443
)
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
});
it('should handle one IPv6 address with a port', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('ipv6:[::1]:50051')!)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '::1' &&
addr.port === 50051
)
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
});
it('should handle multiple IPv6 addresses with different ports', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('ipv6:[::1]:50051,[::1]:50052')!)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
// Only handle the first resolution result
listener.onSuccessfulResolution = () => {};
assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '::1' &&
addr.port === 50051
)
);
assert(
addressList.some(
addr =>
isTcpSubchannelAddress(addr) &&
addr.host === '::1' &&
addr.port === 50052
)
);
done();
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
});
});
describe('getDefaultAuthority', () => {
class OtherResolver implements resolverManager.Resolver {
updateResolution() {

View File

@ -1,6 +1,6 @@
{
"name": "grpc-tools",
"version": "1.11.1",
"version": "1.11.2",
"author": "Google Inc.",
"description": "Tools for developing with gRPC on Node.js",
"homepage": "https://grpc.io/",
@ -24,7 +24,7 @@
"prepublishOnly": "git submodule update --init --recursive && node copy_well_known_protos.js"
},
"dependencies": {
"node-pre-gyp": "^0.15.0"
"@mapbox/node-pre-gyp": "^1.0.5"
},
"binary": {
"module_name": "grpc_tools",

View File

@ -579,6 +579,34 @@ function generateServiceInterfaces(formatter: TextFormatter, serviceType: Protob
generateServiceDefinitionInterface(formatter, serviceType);
}
function containsDefinition(definitionType: typeof Protobuf.Type | typeof Protobuf.Enum, namespace: Protobuf.NamespaceBase): boolean {
for (const nested of namespace.nestedArray.sort(compareName)) {
if (nested instanceof definitionType) {
return true;
} else if (isNamespaceBase(nested) && !(nested instanceof Protobuf.Type) && !(nested instanceof Protobuf.Enum) && containsDefinition(definitionType, nested)) {
return true;
}
}
return false;
}
function generateDefinitionImports(formatter: TextFormatter, namespace: Protobuf.NamespaceBase, options: GeneratorOptions) {
const imports = [];
if (containsDefinition(Protobuf.Enum, namespace)) {
imports.push('EnumTypeDefinition');
}
if (containsDefinition(Protobuf.Type, namespace)) {
imports.push('MessageTypeDefinition');
}
if (imports.length) {
formatter.writeLine(`import type { ${imports.join(', ')} } from '@grpc/proto-loader';`);
}
}
function generateServiceImports(formatter: TextFormatter, namespace: Protobuf.NamespaceBase, options: GeneratorOptions) {
for (const nested of namespace.nestedArray.sort(compareName)) {
if (nested instanceof Protobuf.Service) {
@ -617,7 +645,7 @@ function generateLoadedDefinitionTypes(formatter: TextFormatter, namespace: Prot
function generateRootFile(formatter: TextFormatter, root: Protobuf.Root, options: GeneratorOptions) {
formatter.writeLine(`import type * as grpc from '${options.grpcLib}';`);
formatter.writeLine("import type { ServiceDefinition, EnumTypeDefinition, MessageTypeDefinition } from '@grpc/proto-loader';");
generateDefinitionImports(formatter, root, options);
formatter.writeLine('');
generateServiceImports(formatter, root, options);

View File

@ -1,5 +1,5 @@
import type * as grpc from '@grpc/grpc-js';
import type { ServiceDefinition, EnumTypeDefinition, MessageTypeDefinition } from '@grpc/proto-loader';
import type { EnumTypeDefinition, MessageTypeDefinition } from '@grpc/proto-loader';
import type { OperationsClient as _google_longrunning_OperationsClient, OperationsDefinition as _google_longrunning_OperationsDefinition } from './google/longrunning/Operations';
import type { EchoClient as _google_showcase_v1beta1_EchoClient, EchoDefinition as _google_showcase_v1beta1_EchoDefinition } from './google/showcase/v1beta1/Echo';

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/proto-loader",
"version": "0.6.2",
"version": "0.6.4",
"author": "Google Inc.",
"contributors": [
{

View File

@ -16,7 +16,7 @@
# Location of the continuous shell script in repository.
build_file: "grpc-node/packages/grpc-js-xds/scripts/xds.sh"
timeout_mins: 180
timeout_mins: 360
action {
define_artifacts {
regex: "github/grpc/reports/**"

View File

@ -16,7 +16,7 @@
# Location of the continuous shell script in repository.
build_file: "grpc-node/packages/grpc-js-xds/scripts/xds-v3.sh"
timeout_mins: 180
timeout_mins: 360
action {
define_artifacts {
regex: "github/grpc/reports/**"

View File

@ -6,7 +6,6 @@ RUN sed -i '/deb http:\/\/deb.debian.org\/debian jessie-updates main/d' /etc/apt
RUN apt-get update
RUN apt-get -t jessie-backports install -y cmake
RUN apt-get install -y curl build-essential python libc6-dev-i386 lib32stdc++-4.9-dev jq
RUN curl -fsSL get.docker.com | bash
RUN mkdir /usr/local/nvm
ENV NVM_DIR /usr/local/nvm